Druid kafka avro

946 views
Skip to first unread message

Younes

unread,
Aug 2, 2016, 10:59:09 AM8/2/16
to Druid Development
Hi all,

I'm trying to setup a realtime kafkafeed to druid.
I followed http://druid.io/docs/latest/development/extensions-core/avro.html

Here is my spec:
{
  "dataSources" : [
    {
      "spec" : {
        "dataSchema" : {
          "dataSource" : "ds",
          "parser" : {
            "type" : "avro_stream",
            "avroBytesDecoder" : {
              "type" : "schema_repo",
              "subjectAndIdConverter" : {
                "type" : "avro_1124",
                "topic" : "mytopics"
              },
              "schemaRepository" : {
                "type" : "avro_1124_rest_client",
                "url" : "http://localhost:8081"
              }
            },
            "parseSpec" : {
              "format": "timeAndDims",
              "timestampSpec": { "column" : "date", "format" : "auto" },
              "dimensionsSpec": { "dimensions" : [] }
            }
         },
          "granularitySpec" : {
            "type" : "uniform",
            "segmentGranularity" : "hour",
            "queryGranularity" : "none"
          },
          "metricsSpec" : [
            {
              "type" : "count",
              "name" : "count"
            }
          ]
        },
        "ioConfig" : {
          "type" : "realtime"
        },
        "tuningConfig" : {
          "type" : "realtime",
          "maxRowsInMemory" : "100000",
          "intermediatePersistPeriod" : "PT10M",
          "windowPeriod" : "PT10M"
        }
      },
      "properties" : {
        "task.partitions" : "1",
        "task.replicants" : "1",
        "topicPattern" : "mytopics.*"
      }
    }
  ],
  "properties" : {
    "zookeeper.connect" : "localhost",
    "druid.discovery.curator.path" : "/druid/discovery",
    "druid.selectors.indexing.serviceName" : "druid/overlord",
    "commit.periodMillis" : "15000",
    "consumer.numThreads" : "2",
    "kafka.zookeeper.connect" : "localhost",
    "kafka.group.id" : "tranquility-kafka-2",
    "serialization.format" : "smile",
    "druidBeam.taskLocator": "overlord"
  }
}

I keep getting this:

2016-08-02 14:54:56,244 [KafkaConsumer-0] ERROR c.m.tranquility.kafka.KafkaConsumer - Exception:
java.lang.NullPointerException: writer cannot be null!
        at org.apache.avro.io.ResolvingDecoder.resolve(ResolvingDecoder.java:80) ~[na:na]
        at org.apache.avro.io.ResolvingDecoder.<init>(ResolvingDecoder.java:49) ~[na:na]
        at org.apache.avro.io.DecoderFactory.resolvingDecoder(DecoderFactory.java:307) ~[na:na]
        at org.apache.avro.generic.GenericDatumReader.getResolver(GenericDatumReader.java:125) ~[na:na]
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:140) ~[na:na]

Any help? This is driving me nuts!


Thanks,



Younes

unread,
Aug 8, 2016, 12:55:56 PM8/8/16
to Druid Development
Turns out this is meant to work with schemarepo, and I'm using schema-manager from confluent.

Thanks,

Himanshu Gupta

unread,
Aug 15, 2016, 6:30:07 PM8/15/16
to Druid Development
Hi Younes,

Do you have single schema for all your events or multiple schemas ?

-- Himanshu

Younes

unread,
Aug 19, 2016, 1:57:42 PM8/19/16
to druid-de...@googlegroups.com
3 schemas.

Y
--
You received this message because you are subscribed to a topic in the Google Groups "Druid Development" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/druid-development/7ccnDQVahDE/unsubscribe.
To unsubscribe from this group and all its topics, send an email to druid-developm...@googlegroups.com.
To post to this group, send email to druid-de...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-development/a018f517-1cdd-4bc8-a5ae-c91383e1edb1%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Himanshu Gupta

unread,
Aug 20, 2016, 12:17:24 AM8/20/16
to Druid Development
I guess you can skip schema_repo if you can try out https://github.com/druid-io/druid/pull/3368/files . that will provide a simplified parsing setup and we can see if your error goes away.

-- Himanshu
To unsubscribe from this group and all its topics, send an email to druid-development+unsubscribe@googlegroups.com.
To post to this group, send email to druid-development@googlegroups.com.

Younes

unread,
Aug 20, 2016, 8:39:39 AM8/20/16
to druid-de...@googlegroups.com
Thank you, I started using it already, I'll post an update as soon as my test is done.
Can we add support for schema-manager?

Y
To unsubscribe from this group and all its topics, send an email to druid-developm...@googlegroups.com.
To post to this group, send email to druid-de...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-development/7edc208f-9cd9-4108-8fc9-9ab408cfc085%40googlegroups.com.

Kenji Noguchi

unread,
Aug 30, 2016, 11:38:45 PM8/30/16
to Druid Development
FWIW here is a note when I got the Avro Extension with Schema Repo working.

- Any errors that failed to get the schema from the Schema Repo results in the "java.lang.NullPointerException: writer cannot be null!" error.
- The "url" for the schema repo in the Tranquility config has to include /schema-repo if you are running off-the-shelf Schema Repo binary.
- Dash - in the schema name is illegal

Feel free to ask me for further debugging.

My suggestion to the maintainer:
- In SchemaRepoBasedAvroBytesDecoder.java around line 77 should check null and raise exception.  It would be helpful to add more diagnostic messages but basically something like this

Schema schema = typedRepository.getSchema(subjectAndId.lhs, subjectAndId.rhs);
if (schema == null)
 
throw new ParseException("schema lookup failed!!!");

- The document should include SchemaRepo setup steps.  An example config would be very helpful e.g. conf-quickstart/tranquility/avro.json
- The document should be explicit regarding the invalid characters.
- The "topic" for the "subjectAndIdConverter" should be renamed to "subject".  Because that's the correct terminology in Avro world. Both Schema Repo and Schema Registry call it subject.   Also It's nothing to do with the Kafka topic.  To use the kafka topic as subject was just a convention introduced by the original author of the avro extension.  

Cheers,
Kenji Noguchi
 

Kenji Noguchi

unread,
Aug 30, 2016, 11:48:44 PM8/30/16
to Druid Development
One more note:

The 4 bytes schema ID header is also peculiar to the extension.
The document needs to explain the non-standard header.  It's a big endian int32 by the way.

Younes

unread,
Aug 31, 2016, 8:41:43 PM8/31/16
to druid-de...@googlegroups.com
Hi,

I'm not using schemarepo, I'm using schema-manager from confluent. As Of now, there is no support for it. I used the new feature where I get to specify my onw schema until schema-manager is supported.

Thanks
--
You received this message because you are subscribed to a topic in the Google Groups "Druid Development" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/druid-development/7ccnDQVahDE/unsubscribe.
To unsubscribe from this group and all its topics, send an email to druid-developm...@googlegroups.com.
To post to this group, send email to druid-de...@googlegroups.com.

Gurdeep Singh

unread,
Nov 2, 2016, 5:39:23 PM11/2/16
to Druid Development
Can you post an example inline schema you used to get around schema-manager?

Regards,
Gurdeep

David Lim

unread,
Nov 4, 2016, 1:27:04 PM11/4/16
to Druid Development
Does schema-manager == Confluent Schema Registry? This PR might interest you: https://github.com/druid-io/druid/pull/3529

jakob....@codecentric.de

unread,
Nov 7, 2016, 6:57:33 AM11/7/16
to Druid Development
Reply all
Reply to author
Forward
0 new messages