Simple Example for using the Kryo Serializer when writing objects to/from Disk

863 views
Skip to first unread message

Ian O'Connell

unread,
Apr 25, 2013, 6:43:09 PM4/25/13
to spark...@googlegroups.com
I wanted to use Kryo for all file serializations(issue was forced by random class not found errors with the java serializer despite the class being in the JAR) so simple bit of code to just add it in it in until the serialization options effect objectFile/writeObjectFile:


example usage:

KryoFile.write("s3://mypath", myOutputRDD)

KryoFile.read[MyTypeInTheRDD](sc, "s3://mypath")

Code:

object KryoFile {

  import org.apache.hadoop.io.NullWritable

  import org.apache.hadoop.io.BytesWritable

  import spark.SparkEnv

  import spark.KryoSerializer

  import spark.SparkContext._

  import spark.SparkContext


  private class KryoByteSerializerInstance(ks: KryoSerializer) {


    val kryo = ks.kryo.get()

    val output = ks.output.get()

    val input = ks.input.get()


    def serialize[T](t: T): Array[Byte] = {

      output.clear()

      kryo.writeClassAndObject(output, t)

      output.toBytes

    }


    def deserialize[T](bytes: Array[Byte]): T = {

      input.setBuffer(bytes)

      kryo.readClassAndObject(input).asInstanceOf[T]

    }

  }


  private def kryoSerialize[R](in: R): Array[Byte] = {

    SparkEnv.get.serializer.asInstanceOf[KryoSerializer].kryo.get().setClassLoader(Thread.currentThread().getContextClassLoader)

    val serializer = new KryoByteSerializerInstance(SparkEnv.get.serializer.asInstanceOf[KryoSerializer])

    serializer.serialize(in)

  }


  private def kryoDeserialize[T](byteArray: Array[Byte]): T = {

    SparkEnv.get.serializer.asInstanceOf[KryoSerializer].kryo.get().setClassLoader(Thread.currentThread().getContextClassLoader)

    val serializer = new KryoByteSerializerInstance(SparkEnv.get.serializer.asInstanceOf[KryoSerializer])

    serializer.deserialize[T](byteArray)

  }


  def write[R: ClassManifest](path: String, in: spark.RDD[R]) {

    in.mapPartitions(iter => iter.grouped(10).map(_.toArray))

      .map(t => (NullWritable.get(), new BytesWritable(kryoSerialize(t))))

      .saveAsSequenceFile(path)

  }


  def read[T: ClassManifest](sc: SparkContext, path: String): spark.RDD[T] = {

    sc.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable])

      .flatMap {

        x => kryoDeserialize[Array[T]](x._2.getBytes)

      }

  }

}

Reply all
Reply to author
Forward
0 new messages