Thanks for this suggestion, this idea provided us with an expedient solution. Here's what the results look like:
import org.foo.avro._
class SparkEnrichedEvent() extends Externalizable {
var event : enriched_event = null
var bytes : Array[Byte] = null
def this(m: Array[Byte]) {
this()
this.event = AvroSerde.decode(m)
}
/* De-serialize from bytes. */
def readExternal(in: ObjectInput) {
val byteCnt = in.readInt()
val bytes = new Array[Byte](byteCnt)
in.read(bytes)
event = AvroSerde.decode(bytes)
}
/* Serialize to bytes. */
def writeExternal(out: ObjectOutput) {
val bytes = AvroSerde.encode(event)
out.writeInt(bytes.length)
out.write(bytes)
}
}
and AvroSerde is a singleton class tasked with SerDe for our enriched_event object:
import org.foo.avro._
// Avro serde support
object AvroSerde {
lazy val schema = ReflectData.get().getSchema(classOf[enriched_event])
lazy val reflectDatumReader = new ReflectDatumReader[enriched_event](schema)
lazy val reflectDatumWriter = new ReflectDatumWriter[enriched_event](schema)
def getSchema() { schema.asInstanceOf[Schema] }
def encode(value: enriched_event): Array[Byte] = {
val outputStream = new ByteArrayOutputStream()
val encoder = EncoderFactory.get().binaryEncoder(outputStream, null)
reflectDatumWriter.write(value, encoder)
encoder.flush()
outputStream.toByteArray()
}
def decode(value: Array[Byte]): enriched_event = {
val decoder = DecoderFactory.get().binaryDecoder(value, null)
reflectDatumReader.read(null, decoder).asInstanceOf[enriched_event]
}
}
Thanks for this suggestion, it's simple and works well and it should be performant because it avoids Java serialization.
Craig