System.setProperty("spark.serializer", "spark.KryoSerializer")
System.setProperty("spark.kryo.registrator", "mypackage.MyRegistrator")12/12/14 13:12:06 INFO cluster.TaskSetManager: Loss was due to java.io.StreamCorruptedException: unexpected block data
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1360)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1963)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1887)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:368)
at java.util.ArrayList.readObject(ArrayList.java:696)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:616)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:988)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1865)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1963)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1887)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1963)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1887)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:368)
at spark.JavaDeserializationStream.readObject(JavaSerializer.scala:23)
at spark.JavaSerializerInstance.deserialize(JavaSerializer.scala:45)
at spark.executor.Executor$TaskRunner.run(Executor.scala:73)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:679)
HashMap<String, String> env = new HashMap<String, String>();
HashSet<String> jars = new HashSet<String>();
Configuration hadoopConf = new Configuration();
hadoopConf.set("fs.defaultFS", "hdfs://nncluster1");
...
...
HiveConf hiveConf = new HiveConf();
hiveConf.set("hive.metastore.warehouse.dir", "/hive")
System.setProperty("spark.serializer", spark.KryoSerializer.class.getName());
System.setProperty("spark.kryo.registrator", KryoRegistrator.class.getName());
MySharkContext shark = new MySharkContext("spark://master:7077", "/usr/local/spark", "myJob", scala.collection.JavaConversions.asScalaSet(jars).toSeq(), scala.collection.JavaConversions.asScalaMap(env), hadoopConf, hiveConf); spark.repl("ret.value = sc.sql2rdd(\"select * from data limit 10\")"); RDD rdd = (RDD) spark.getReturn(); System.out.println("Count="+rdd.count());
MySharkContext
public class Spark extends SparkContext{
private Configuration hadoopConf;
private HiveConf hiveConf;
private Global global;
private IMain interpreter;
private Return ret;
private SparkEnv sparkEnv;
private SessionState sessionState;
public Spark(String master, String jobName, String sparkHome, Seq<String>jars, Map<String, String> env, Configuration hadoopConf, HiveConf hiveConf){
super(master, jobName, sparkHome, jars, env);
this.hadoopConf = hadoopConf;
this.hiveConf = hiveConf;
sparkEnv = SparkEnv.get();
sessionState = new SessionState(hiveConf);
try {
sessionState.out = new PrintStream(System.out, true, "UTF-8");
sessionState.err = new PrintStream(System.err, true, "UTF-8");
} catch (UnsupportedEncodingException e) {
}
Settings settings = new Settings();
String classpath = System.getProperty("java.class.path");
for(String cp : classpath.split(":")){
settings.classpath().append(cp);
}
interpreter = new IMain(settings);
ret = new Return();
interpreter.bindValue("sc", this);
interpreter.bindValue("ret", ret);
}
public RDD hadoopFile(String path, Class inputFormatClass, Class keyClass, Class valueClass, int minSplits){
JobConf conf = new JobConf(hadoopConf);
FileInputFormat.setInputPaths(conf, path);
String bufferSize = System.getProperty("spark.buffer.size", "65536");
conf.set("io.file.buffer.size", bufferSize);
return new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits);
}
public Object sql2rdd(String query){
SparkEnv.set(sparkEnv);
SharkEnv.sc_$eq(this);
SessionState.start(sessionState);
SharkDriver driver = new SharkDriver(hiveConf);
driver.init();
return driver.tableRdd(query);
}
public Object getReturn(){
return ret.value;
}