KryoFile.write("s3://mypath", myOutputRDD)
object KryoFile {
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.io.BytesWritable
import spark.SparkEnv
import spark.KryoSerializer
import spark.SparkContext._
import spark.SparkContext
private class KryoByteSerializerInstance(ks: KryoSerializer) {
val kryo = ks.kryo.get()
val output = ks.output.get()
val input = ks.input.get()
def serialize[T](t: T): Array[Byte] = {
output.clear()
kryo.writeClassAndObject(output, t)
output.toBytes
}
def deserialize[T](bytes: Array[Byte]): T = {
input.setBuffer(bytes)
kryo.readClassAndObject(input).asInstanceOf[T]
}
}
private def kryoSerialize[R](in: R): Array[Byte] = {
SparkEnv.get.serializer.asInstanceOf[KryoSerializer].kryo.get().setClassLoader(Thread.currentThread().getContextClassLoader)
val serializer = new KryoByteSerializerInstance(SparkEnv.get.serializer.asInstanceOf[KryoSerializer])
serializer.serialize(in)
}
private def kryoDeserialize[T](byteArray: Array[Byte]): T = {
SparkEnv.get.serializer.asInstanceOf[KryoSerializer].kryo.get().setClassLoader(Thread.currentThread().getContextClassLoader)
val serializer = new KryoByteSerializerInstance(SparkEnv.get.serializer.asInstanceOf[KryoSerializer])
serializer.deserialize[T](byteArray)
}
def write[R: ClassManifest](path: String, in: spark.RDD[R]) {
in.mapPartitions(iter => iter.grouped(10).map(_.toArray))
.map(t => (NullWritable.get(), new BytesWritable(kryoSerialize(t))))
.saveAsSequenceFile(path)
}
def read[T: ClassManifest](sc: SparkContext, path: String): spark.RDD[T] = {
sc.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable])
.flatMap {
x => kryoDeserialize[Array[T]](x._2.getBytes)
}
}
}