Spark Streaming and Avro Objects

2,111 views
Skip to first unread message

Craig Vanderborgh

unread,
Mar 22, 2013, 3:06:28 PM3/22/13
to spark...@googlegroups.com
Hi All:

I have the following code in my Spark Streaming program:

  events.foreach(rdd => {
      if (rdd.count() != 0) {
        rdd.collect.foreach(e => dump(e))
      }
    })

where events is of type DStream[enriched_event], enriched_event is a deserialized Avro object and where dump() is simply this:

  def dump(e: enriched_event) {
    var command = e.getCommandData();
    println("commandName: \"" + command.getCommandName() + "\"")
    println("eventIdentifier: \"" + command.getEventIdentifier() + "\"")
    var event_meta = command.getEventMetaData()
    var time_stamp = command.getTimeStampData()
  }

When I run this I get NotSerializableExceptions:

13/03/22 12:57:10 ERROR LocalScheduler: Exception in task 0
java.io.NotSerializableException: org.foo.avro.enriched_event
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1164)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1346)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1154)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
        at spark.JavaSerializationStream.writeObject(JavaSerializer.scala:11)
        at spark.JavaSerializerInstance.serialize(JavaSerializer.scala:31)
        at spark.scheduler.local.LocalScheduler.runTask$1(LocalScheduler.scala:80)
        at spark.scheduler.local.LocalScheduler$$anon$1.run(LocalScheduler.scala:50)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)

This looks to me like it's trying to serialize my Avro objects with Java serialization.  Why is it doing this, and what can I do to make this work?

Any/all ideas appreciated!

Craig Vanderborgh

Tathagata Das

unread,
Mar 22, 2013, 3:40:30 PM3/22/13
to spark...@googlegroups.com
The default serializer used by Spark is Java serialization. So for all java types (including Avro enriched_event) it will try to serialize using Java serialiation. You can also use KryoSerializer, or plugin in your custom serialization (like Avro). You can read more about serialization here. http://spark-project.org/docs/latest/tuning.html

TD



Craig Vanderborgh

--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Patrick Wendell

unread,
Mar 22, 2013, 3:41:38 PM3/22/13
to spark...@googlegroups.com
Hey,

Spark uses Java Serialization, it doesn't use Avro natively (we need
to to work on any type of Java object). Avro's classes don't extend
serializable, so it won't work out of the box.

You can work around this by wrapping your objects in something that is
externalizable. Take a look at the source code for the Flume Input
DStream, it has to do this as well (by creating a wrapper for
AvroFlumeEvent).

We should probably just write an implicit conversion that does this
wrapping automatically. This will make it "just work". I won't be able
to do that for a week or so, so check out the work-around in the mean
time.

- Patrick

On Fri, Mar 22, 2013 at 12:06 PM, Craig Vanderborgh
<craigvan...@gmail.com> wrote:

Patrick Wendell

unread,
Mar 22, 2013, 3:45:40 PM3/22/13
to spark...@googlegroups.com
The code we used for flume events is in this file:

https://github.com/mesos/spark/blob/master/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala

This actually exploits the specific fields in the FlumeEvent schema.
You could write a more general wrapper that just calls avro's
serialization function and populates a bytebuffer and on the reverse
ends uses avro's to deserialize. It would work on any Avro type.

- Patrick

Craig Vanderborgh

unread,
Mar 25, 2013, 11:25:05 AM3/25/13
to spark...@googlegroups.com
Hi Patrick:

Thanks for this suggestion, this idea provided us with an expedient solution.  Here's what the results look like:

import org.foo.avro._

class SparkEnrichedEvent() extends Externalizable {
  var event : enriched_event = null
  var bytes : Array[Byte] = null

  def this(m: Array[Byte]) {
    this()
    this.event = AvroSerde.decode(m)
  }

  /* De-serialize from bytes. */
  def readExternal(in: ObjectInput) {
    val byteCnt = in.readInt()
    val bytes = new Array[Byte](byteCnt)
    in.read(bytes)
    event = AvroSerde.decode(bytes)
  }

  /* Serialize to bytes. */
  def writeExternal(out: ObjectOutput) {
    val bytes = AvroSerde.encode(event)
    out.writeInt(bytes.length)
    out.write(bytes)
  }
}

and AvroSerde is a singleton class tasked with SerDe for our enriched_event object:

import org.foo.avro._

// Avro serde support
object AvroSerde {
  lazy val schema = ReflectData.get().getSchema(classOf[enriched_event])
  lazy val reflectDatumReader = new ReflectDatumReader[enriched_event](schema)
  lazy val reflectDatumWriter = new ReflectDatumWriter[enriched_event](schema)

  def getSchema() { schema.asInstanceOf[Schema] }
  
  def encode(value: enriched_event): Array[Byte] = {
    val outputStream = new ByteArrayOutputStream()
    val encoder = EncoderFactory.get().binaryEncoder(outputStream, null)
    reflectDatumWriter.write(value, encoder)
    encoder.flush()
    outputStream.toByteArray()
  }

 def decode(value: Array[Byte]): enriched_event = {
    val decoder = DecoderFactory.get().binaryDecoder(value, null)
    reflectDatumReader.read(null, decoder).asInstanceOf[enriched_event]
  }
}

Thanks for this suggestion, it's simple and works well and it should be performant because it avoids Java serialization.

Craig
Reply all
Reply to author
Forward
0 new messages