kryo serializing of class in spark returns null while de-serialization

844 views
Skip to first unread message

shootingfrog

unread,
Apr 15, 2014, 10:13:42 AM4/15/14
to kryo-...@googlegroups.com
Hi,

I am using java spark API to write some test application . I am using a class which doesn't extends serializable interface . So to make the application work I am using kryo serializer to serialize the class . But the problem is which I observed while debugging was that during the de-serialization the returned class object becomes null and in turn throws a null pointer exception . Can anyone guide me where am I going wrong . Here is the code I am testing :



package org.apache.spark.examples;


import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.UnknownHostException;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;




/**
 * Spark application to test the Serialization issue in spark
 */
public class Test {

static PrintWriter outputFileWriter;
static FileWriter file;
static JavaSparkContext ssc;

public static void main(String[] args) {

String inputFile = "/home/incubator-spark/examples/src/main/scala/org/apache/spark/examples/InputFile.txt";
String master = "local";
String jobName = "TestSerialization";
String sparkHome = "/home/test/Spark_Installation/spark-0.7.0";
String sparkJar = "/home/test/TestSerializationIssesInSpark/TestSparkSerIssueApp/target/TestSparkSerIssueApp-0.0.1-SNAPSHOT.jar";

SparkConf conf = new SparkConf();
conf.set("spark.closure.serializer","org.apache.spark.serializer.KryoSerializer");
conf.set("spark.kryo.registrator", "org.apache.spark.examples.MyRegistrator");
// create the Spark context
if(master.equals("local")){
ssc = new JavaSparkContext("local", jobName,conf);
//ssc = new JavaSparkContext("local", jobName);
} else {
ssc = new JavaSparkContext(master, jobName, sparkHome, sparkJar);
}
JavaRDD<String> testData = ssc.textFile(inputFile).cache();
final NotSerializableJavaClass notSerializableTestObject= new NotSerializableJavaClass("Hi ");
@SuppressWarnings({ "serial", "unchecked"})
JavaRDD<String> classificationResults = testData.map(
new Function<String, String>() {
@Override
public String call(String inputRecord) throws Exception {
if(!inputRecord.isEmpty()) {
//String[] pointDimensions = inputRecord.split(",");
String result = "";

try {
FileWriter file = new FileWriter("/home/test/TestSerializationIssesInSpark/results/test_result_" + (int) (Math.random() * 100));
PrintWriter outputFile = new PrintWriter(file); 
InetAddress ip;
ip = InetAddress.getLocalHost();
outputFile.println("IP of the server: " + ip);
result = notSerializableTestObject.testMethod(inputRecord);
outputFile.println("Result: " + result);

outputFile.flush();
outputFile.close();
file.close();

} catch (UnknownHostException e) {
e.printStackTrace();
}
catch (IOException e1) {
e1.printStackTrace();

return result;
} else {
System.out.println("End of elements in the stream.");
String result = "End of elements in the input data";
return result;
}
}

}).cache();

long processedRecords = classificationResults.count();

ssc.stop();
System.out.println("sssssssssss"+processedRecords);
}
}




Here is the  KryoRegistrator  class



package org.apache.spark.examples;

import org.apache.spark.serializer.KryoRegistrator;

import com.esotericsoftware.kryo.Kryo;

public class MyRegistrator implements KryoRegistrator {
public void registerClasses(Kryo kryo) {
kryo.register(NotSerializableJavaClass.class);
}
}

Ian O'Connell

unread,
Apr 15, 2014, 10:34:55 AM4/15/14
to kryo-...@googlegroups.com
I didn't see the class that's not serializable anywhere ? That would be needed... A transient field in the class could easily cause this behavior. 
--
You received this message because you are subscribed to the "kryo-users" group.
http://groups.google.com/group/kryo-users
---
You received this message because you are subscribed to the Google Groups "kryo-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to kryo-users+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

shootingfrog

unread,
Apr 15, 2014, 10:37:20 AM4/15/14
to kryo-...@googlegroups.com
Sorry forgot to paste that class here .. Here it is :

package org.apache.spark.examples;

public class NotSerializableJavaClass {
public String testVariable;

public NotSerializableJavaClass(String testVariable) {
super();
this.testVariable = testVariable;
}
public String testMethod(String vartoAppend){
return this.testVariable + vartoAppend;
}
}

also similar problem link in spark mailing list can be seen here :

To unsubscribe from this group and stop receiving emails from it, send an email to kryo-users+unsubscribe@googlegroups.com.

Ian O'Connell

unread,
Apr 15, 2014, 1:31:25 PM4/15/14
to kryo-...@googlegroups.com
It ultimately sounds like it might be a spark issue, I know they do some stuff about trying to null out elements of closures they think are unused. Maybe test that your class gets round tripped with a kryo example, after that its a spark issue probably. Or potentially a Chill one.

If you just want to get 'past this' I think it should be easy to use Chill's externalizer from java, then just box it inside that(which will make it java serializable) and unbox when you need it. 


To unsubscribe from this group and stop receiving emails from it, send an email to kryo-users+...@googlegroups.com.

shootingfrog

unread,
Apr 18, 2014, 9:09:08 AM4/18/14
to kryo-...@googlegroups.com
Since I am new to this kind of serialization I am not sure I kind of picked your point on using chill's externalizer and box and unbox it . Can you please elaborate on how to go about it ?

Regards

On Tuesday, April 15, 2014 11:01:25 PM UTC+5:30, Ian O'Connell wrote:
It ultimately sounds like it might be a spark issue, I know they do some stuff about trying to null out elements of closures they think are unused. Maybe test that your class gets round tripped with a kryo example, after that its a spark issue probably. Or potentially a Chill one.

If you just want to get 'past this' I think it should be easy to use Chill's externalizer from java, then just box it inside that(which will make it java serializable) and unbox when you need it. 
On Tue, Apr 15, 2014 at 7:37 AM, shootingfrog <harsh...@gmail.com> wrote:
Sorry forgot to paste that class here .. Here it is :

package org.apache.spark.examples;

public class NotSerializableJavaClass {
public String testVariable;

public NotSerializableJavaClass(String testVariable) {
super();
this.testVariable = testVariable;
}
public String testMethod(String vartoAppend){
return this.testVariable + vartoAppend;
}
}

also similar problem link in spark mailing list can be seen here :

Reply all
Reply to author
Forward
0 new messages