AvroRuntimeException serializing Avro parsed messages

1,208 views
Skip to first unread message

Ben Vogan

unread,
Jun 28, 2016, 11:08:26 PM6/28/16
to Druid User
Hi all,

I am trying to use the Avro extension to parse Avro messages stored in Kafka.  I'm afraid my understanding of tranquility/druid+jackson is still a little shaky so I am not understanding the source of the error I am running into (see below).  The schema is successfully retrieved from the schema repo and the message is parsed, but it looks like the schema is embedded in the returned rows and the serialization of those rows fails because of it.  I don't know if this is a problem with my schema definition, avro/jackson interaction, or the avro extension.  Can anyone here point me in the right direction?

The schema is quite large so I have not included the whole thing here, but this is the snippet defining the field that is failing to serialize (pulled from the schema repo):

{

    "name" : "device_platform_type",

    "type" : [ "null", {

      "type" : "enum",

      "name" : "device_platform_type_enum",

      "symbols" : [ "ios", "android", "web" ]

    } ],

    "default" : null

  }


And here is the exception:

2016-06-29 02:17:54,488 [KafkaConsumer-0] ERROR c.m.tranquility.kafka.KafkaConsumer - Exception:

java.lang.RuntimeException: com.fasterxml.jackson.databind.JsonMappingException: Not a map: {"type":"enum","name":"device_platform_type_enum","namespace":"com.shopkick.data","symbols":["ios","android","web"]} (through reference chain: org.apache.avro.generic.EnumSymbol["schema"]->org.apache.avro.EnumSchema["valueType"])

        at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[com.google.guava.guava-16.0.1.jar:na]

        at com.metamx.tranquility.kafka.writer.TranquilityEventWriter.maybeThrow(TranquilityEventWriter.java:138) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0]

        at com.metamx.tranquility.kafka.writer.TranquilityEventWriter.send(TranquilityEventWriter.java:105) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0]

        at com.metamx.tranquility.kafka.KafkaConsumer$2.run(KafkaConsumer.java:231) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0]

        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_101]

        at java.util.concurrent.FutureTask.run(FutureTask.java:262) [na:1.7.0_101]

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_101]

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_101]

        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_101]

Caused by: com.fasterxml.jackson.databind.JsonMappingException: Not a map: {"type":"enum","name":"device_platform_type_enum","namespace":"com.shopkick.data","symbols":["ios","android","web"]} (through reference chain: org.apache.avro.generic.EnumSymbol["schema"]->org.apache.avro.EnumSchema["valueType"])

        at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:210) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

        at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:177) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

        at com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:187) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

        at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:647) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

        at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:152) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

        at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:505) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

        at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:639) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

        at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:152) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

        at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

        at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

        at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280) ~[com.fasterxml.jackson.core.jackson-core-2.4.6.jar:2.4.6]

        at com.fasterxml.jackson.core.JsonGenerator.writeObjectField(JsonGenerator.java:1255) ~[com.fasterxml.jackson.core.jackson-core-2.4.6.jar:2.4.6]

        at com.metamx.tranquility.druid.input.InputRowObjectWriter$$anonfun$com$metamx$tranquility$druid$input$InputRowObjectWriter$$writeJson$2.apply(InputRowObjectWriter.scala:86) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

        at com.metamx.tranquility.druid.input.InputRowObjectWriter$$anonfun$com$metamx$tranquility$druid$input$InputRowObjectWriter$$writeJson$2.apply(InputRowObjectWriter.scala:84) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

        at scala.collection.Iterator$class.foreach(Iterator.scala:742) ~[org.scala-lang.scala-library-2.11.7.jar:na]

        at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) ~[org.scala-lang.scala-library-2.11.7.jar:na]

        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[org.scala-lang.scala-library-2.11.7.jar:na]

        at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[org.scala-lang.scala-library-2.11.7.jar:na]

        at com.metamx.tranquility.druid.input.InputRowObjectWriter.com$metamx$tranquility$druid$input$InputRowObjectWriter$$writeJson(InputRowObjectWriter.scala:84) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

        at com.metamx.tranquility.druid.input.InputRowObjectWriter$$anonfun$batchAsBytes$1.apply(InputRowObjectWriter.scala:67) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

        at com.metamx.tranquility.druid.input.InputRowObjectWriter$$anonfun$batchAsBytes$1.apply(InputRowObjectWriter.scala:67) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

        at scala.collection.Iterator$class.foreach(Iterator.scala:742) ~[org.scala-lang.scala-library-2.11.7.jar:na]

        at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) ~[org.scala-lang.scala-library-2.11.7.jar:na]

        at com.metamx.tranquility.druid.input.InputRowObjectWriter.batchAsBytes(InputRowObjectWriter.scala:67) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

        at com.metamx.tranquility.beam.MessageHolder$$anon$3.batchAsBytes(MessageHolder.scala:83) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

        at com.metamx.tranquility.druid.DruidBeam$$anonfun$3.apply(DruidBeam.scala:75) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

        at com.metamx.tranquility.druid.DruidBeam$$anonfun$3.apply(DruidBeam.scala:75) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

        at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) ~[org.scala-lang.scala-library-2.11.7.jar:na]

        at scala.collection.Iterator$class.foreach(Iterator.scala:742) ~[org.scala-lang.scala-library-2.11.7.jar:na]

        at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) ~[org.scala-lang.scala-library-2.11.7.jar:na]

        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) ~[org.scala-lang.scala-library-2.11.7.jar:na]

        at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183) ~[org.scala-lang.scala-library-2.11.7.jar:na]

        at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45) ~[org.scala-lang.scala-library-2.11.7.jar:na]

        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308) ~[org.scala-lang.scala-library-2.11.7.jar:na]

        at scala.collection.AbstractIterator.to(Iterator.scala:1194) ~[org.scala-lang.scala-library-2.11.7.jar:na]

        at scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:292) ~[org.scala-lang.scala-library-2.11.7.jar:na]

        at scala.collection.AbstractIterator.toList(Iterator.scala:1194) ~[org.scala-lang.scala-library-2.11.7.jar:na]

        at com.metamx.tranquility.druid.DruidBeam.sendAll(DruidBeam.scala:76) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

        at com.metamx.tranquility.beam.MergingPartitioningBeam$$anonfun$sendAll$2.apply(MergingPartitioningBeam.scala:43) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

        at com.metamx.tranquility.beam.MergingPartitioningBeam$$anonfun$sendAll$2.apply(MergingPartitioningBeam.scala:42) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778) ~[org.scala-lang.scala-library-2.11.7.jar:na]

        at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) ~[org.scala-lang.scala-library-2.11.7.jar:na]

        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777) ~[org.scala-lang.scala-library-2.11.7.jar:na]

        at com.metamx.tranquility.beam.MergingPartitioningBeam.sendAll(MergingPartitioningBeam.scala:42) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

        at com.metamx.tranquility.beam.ClusteredBeam$$anonfun$sendAll$2$$anonfun$26.apply(ClusteredBeam.scala:388) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

        at com.metamx.tranquility.beam.ClusteredBeam$$anonfun$sendAll$2$$anonfun$26.apply(ClusteredBeam.scala:379) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0]

        at com.twitter.util.Promise$Transformer.liftedTree1$1(Promise.scala:112) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]

        at com.twitter.util.Promise$Transformer.k(Promise.scala:112) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]

        at com.twitter.util.Promise$Transformer.apply(Promise.scala:122) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]

        at com.twitter.util.Promise$Transformer.apply(Promise.scala:103) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]

        at com.twitter.util.Promise$$anon$1.run(Promise.scala:366) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]

        at com.twitter.concurrent.LocalScheduler$Activation.run(Scheduler.scala:178) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]

        at com.twitter.concurrent.LocalScheduler$Activation.submit(Scheduler.scala:136) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]

        at com.twitter.concurrent.LocalScheduler.submit(Scheduler.scala:207) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]

        at com.twitter.concurrent.Scheduler$.submit(Scheduler.scala:92) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]

        at com.twitter.util.Promise.runq(Promise.scala:350) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]

        at com.twitter.util.Promise.updateIfEmpty(Promise.scala:721) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]

        at com.twitter.util.ExecutorServiceFuturePool$$anon$2.run(FuturePool.scala:107) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]

        ... 5 common frames omitted

Caused by: org.apache.avro.AvroRuntimeException: Not a map: {"type":"enum","name":"device_platform_type_enum","namespace":"com.shopkick.data","symbols":["ios","android","web"]}

        at org.apache.avro.Schema.getValueType(Schema.java:267) ~[na:na]

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_101]

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.7.0_101]

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_101]

        at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_101]

        at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:466) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]

        at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:639) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6]


Thanks!

--Ben

Ben Vogan

unread,
Jun 29, 2016, 11:54:00 PM6/29/16
to Druid User
I upgraded to Druid 0.9.1 and tried the new Kafka Indexing Service and it works with the same messages and schema.  Very excited that it doesn't require a time window and guarantees exactly once semantics as well.  I would still like to understand why Tranquility doesn't work.

--Ben

Gian Merlino

unread,
Jun 30, 2016, 3:42:26 PM6/30/16
to druid...@googlegroups.com
Hey Ben,

Based on your stack trace it looks like Tranquility is getting confused by the EnumSymbol object returned by Avro. It's trying to transcode that to JSON for sending to Druid (since Tranquility -> Druid communication is always JSON regardless of your input datatype) but failing. I filed this issue for that: https://github.com/druid-io/tranquility/issues/183

Gian

--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+...@googlegroups.com.
To post to this group, send email to druid...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-user/2b9842c3-b150-4101-b61d-b226fd028b88%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Ben Vogan

unread,
Jun 30, 2016, 5:01:16 PM6/30/16
to druid...@googlegroups.com
Thanks very much for your response Gian!  So far the kafka indexing service has been stable, but if I run into problems with it I can try changing the enum to a string.  I guess people are either not using a) Avro enums, or b) the Avro format for ingesting data with Tranquility.  I wonder if I am making my life more difficult than necessary...

--Ben

--
You received this message because you are subscribed to a topic in the Google Groups "Druid User" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/druid-user/7IpXCpWuhBA/unsubscribe.
To unsubscribe from this group and all its topics, send an email to druid-user+...@googlegroups.com.

To post to this group, send email to druid...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--

BENJAMIN VOGAN | Data Platform Team Lead
 
The indispensable app that rewards you for shopping.
Reply all
Reply to author
Forward
0 new messages