"unread block data" on spark standalone job

2,479 views
Skip to first unread message

Tyson Hamilton

unread,
Dec 11, 2012, 10:42:46 AM12/11/12
to spark...@googlegroups.com
Hi,

I'm running a job using the a Hadoop InputFormat and newAPIHadoopRDD. This job involves using Hector (a Java api) to retrieve data from a Cassandra cluster. The job works well with Hadoop and I've had the job working in local mode with Spark, but when trying to run using a standalone master/slave on the same localhost I receive the following exception:

java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2376)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1360)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
at spark.JavaDeserializationStream.readObject(JavaSerializer.scala:23)
at spark.JavaSerializerInstance.deserialize(JavaSerializer.scala:45)
at spark.executor.Executor$TaskRunner.run(Executor.scala:73)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)


Any ideas?

-Tyson

Tyson Hamilton

unread,
Dec 12, 2012, 3:35:02 PM12/12/12
to spark...@googlegroups.com
So I am not entirely sure why this error was happening, but clearly it involved the JavaSerializerInstance. If anyone else runs into a similar problem, I would suggest trying to switch to the KryoSerializer which is outlined in the configuration guide on the spark homepage:

System.setProperty("spark.serializer", "spark.KryoSerializer")
System.setProperty("spark.kryo.registrator", "mypackage.MyRegistrator")

This eliminated the exception and my job is running great. Also the KryoSerializer is much faster! Bonus!

-Tyson

moon soo Lee

unread,
Dec 13, 2012, 11:38:19 PM12/13/12
to spark...@googlegroups.com
Hi. 

i'm also trying to do the same thing.
i also set "spark.serializer", "spark.kryo.registrator" property before i create SparkContext.

i also set "spark.kryo.registerator" to "shark.KryoRegistrator".

However my code generate the same exception even it is "local" mode.


and the strange thing is Executor.scala:73. the code i think always juse JavaSeializer, it's not related to spark.serializer, but spark.closure.serializer which is JavaSerializer at default.

can you guys help me?
i really tried many things, but doesn't work.

here's my exception

12/12/14 13:12:06 INFO cluster.TaskSetManager: Loss was due to java.io.StreamCorruptedException: unexpected block data

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1360)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1963)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1887)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:368)

at java.util.ArrayList.readObject(ArrayList.java:696)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:616)

at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:988)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1865)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1963)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1887)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1963)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1887)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:368)

at spark.JavaDeserializationStream.readObject(JavaSerializer.scala:23)

at spark.JavaSerializerInstance.deserialize(JavaSerializer.scala:45)

at spark.executor.Executor$TaskRunner.run(Executor.scala:73)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)

at java.lang.Thread.run(Thread.java:679)

Reynold Xin

unread,
Dec 14, 2012, 1:05:05 AM12/14/12
to spark...@googlegroups.com
How did you do the setting? It seems like on the slaves, java serializer is used instead of the kryo serializer.

moon soo Lee

unread,
Dec 14, 2012, 1:51:43 AM12/14/12
to spark...@googlegroups.com
my standalone master - worker cluster works fine. (with shark)

i manually created SparkContext in my java standalone application

my java application look like 

main

HashMap<String, String> env = 
new HashMap<String, String>();
HashSet<String> jars = new HashSet<String>();

Configuration hadoopConf = new Configuration();
hadoopConf.set("fs.defaultFS", "hdfs://nncluster1");
...
...

HiveConf hiveConf = new HiveConf();
hiveConf.set("hive.metastore.warehouse.dir", "/hive")

System.setProperty("spark.serializer", spark.KryoSerializer.class.getName());
System.setProperty("spark.kryo.registrator", KryoRegistrator.class.getName());

MySharkContext shark = new MySharkContext("spark://master:7077", "/usr/local/spark", "myJob", scala.collection.JavaConversions.asScalaSet(jars).toSeq(), scala.collection.JavaConversions.asScalaMap(env), hadoopConf, hiveConf);
spark.repl("ret.value = sc.sql2rdd(\"select * from data limit 10\")"); RDD rdd = (RDD) spark.getReturn(); System.out.println("Count="+rdd.count()); 

MySharkContext


public class Spark extends SparkContext{

private Configuration hadoopConf;
private HiveConf hiveConf;
private Global global;
private IMain interpreter;
private Return ret;
private SparkEnv sparkEnv;
private SessionState sessionState;

public Spark(String master, String jobName, String sparkHome, Seq<String>jars, Map<String, String> env, Configuration hadoopConf, HiveConf hiveConf){
  super(master, jobName, sparkHome, jars, env);
  this.hadoopConf = hadoopConf;
  this.hiveConf = hiveConf;

  sparkEnv = SparkEnv.get();
  sessionState = new SessionState(hiveConf);
  try {
    sessionState.out = new PrintStream(System.out, true, "UTF-8");
    sessionState.err = new PrintStream(System.err, true, "UTF-8");
  } catch (UnsupportedEncodingException e) {
  }

  Settings settings = new Settings();
  String classpath = System.getProperty("java.class.path");
  for(String cp : classpath.split(":")){
    settings.classpath().append(cp);
  }

  interpreter
= new IMain(settings);

  ret
= new Return();
  interpreter.bindValue("sc", this);
  interpreter.bindValue("ret", ret);
}

public RDD hadoopFile(String path, Class inputFormatClass, Class keyClass, Class valueClass, int minSplits){
  JobConf conf = new JobConf(hadoopConf);
  FileInputFormat.setInputPaths(conf, path);
  String bufferSize = System.getProperty("spark.buffer.size", "65536");
  conf.set("io.file.buffer.size", bufferSize);
  return new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits);  
}


public Object sql2rdd(String query){

  SparkEnv.set(sparkEnv);
  SharkEnv.sc_$eq(this);
  SessionState.start(sessionState);  

  SharkDriver driver = new SharkDriver(hiveConf);
  driver.init();
  return driver.tableRdd(query);

}


public Object getReturn(){

  return ret.value;

}


what i'm trying to do with my code is make a query in Java application
with programmatically set HadoopConf and HiveConf.

but the code get exception "StreamCorruptedException: unexpected block data", started from JavaSerializer.
Message has been deleted

moon soo Lee

unread,
Dec 14, 2012, 10:26:54 PM12/14/12
to spark...@googlegroups.com
Thanks for replay.

What kind of class should i register in 'YourKryoRegistrator'. Can't i
just use shark.kryoRegistrator ?



On 2012. 12. 14., at 오후 11:39, Tyson Hamilton
<tyson.j....@gmail.com> wrote:

> YourKryoRegistrator

Tyson Hamilton

unread,
Dec 17, 2012, 11:41:48 AM12/17/12
to spark...@googlegroups.com
Hi,

I deleted my post because I saw that you are using the Shark KryoRegistrator which I am unfamiliar with.  If you are trying to operate within the realms of Shark and are following their pattern then this should suffice.  If you are creating your own objects, classes, closures, then you need to register these yourself in a similar fashion to how Shark does.

Perhaps it would be best if you tried to simplify your approach, get the basics working and then build up to what you are trying to do.

-Tyson

Jerry Shao

unread,
Aug 2, 2013, 5:45:54 AM8/2/13
to spark...@googlegroups.com
Hi Tyson,

I met the same exception as yours when using customized input format to create hadoop rdd, would you please share your experience about how to solve this exception. I saw your post that using kryo can solve this problem, but which how to judge which class should be serialized by kryo, should I serialize all the classes using kryo?

Thanks
Jerry 

在 2012年12月18日星期二UTC+8上午12时41分48秒,Tyson Hamilton写道:
Reply all
Reply to author
Forward
0 new messages