Kafka Indexing Service with Kafka message format "Avro"

729 views
Skip to first unread message

Gurdeep Singh

unread,
Nov 11, 2016, 12:28:07 PM11/11/16
to Druid User
Hi All,


Will the Kafka indexing service work if my Kafka topic is in avro format? I am trying to work on a realtime ingestion spec from a kafka topic which has messages in avro format. If this service is not ready for type "avro" what other alternative method can I try? Our Kafka topics do not have schema repo, but there is a confluent schema registry.

Can someone give a sample spec which is working on consuming messages from a topic with format avro.

Regards,
gurdeep

Ben Vogan

unread,
Nov 11, 2016, 12:52:18 PM11/11/16
to druid...@googlegroups.com
Hi Gurdeep,

We are using Avro + the confluent schema registry.  There is an Avro module, but it doesn't support Confluent's schema registry (it uses a different open source registry).  I modified the existing Avro module so that it could be configured to use either one, but I did not try and have it accepted into the main project as it introduces a dependency on another maven repository (Confluent's).  There are a few things I would change if I had time, and my change is unfortunately spread over 3 commits, but it works for us:


I made these changes to the 0.9.1.1 branch so I do not know if there are conflicts applying it to 0.9.2.

A supervisor spec would then look like:

{
  "type": "kafka",
  "dataSchema" : {
          "dataSource" : "your_data_source",
          "parser" : {
            "type" : "avro_stream",
            "avroBytesDecoder" : {
              "type" : "schema_repo",
              "subjectAndIdConverter" : {
                "type" : "confluent",
                "subject" : "${kafka_topic}"
              },
              "schemaRepository" : {
                "type" : "confluent_client",
                "url" : "http://${schema_registry_host}:${schema_registry_port}",
                "identityMapCapacity" : 1000
              }
            },
            "parseSpec":{
               "format":"timeAndDims",
               "timestampSpec":{
                  "column":"your_timestamp_column",
                  "format":"auto"
               },
               "dimensionsSpec":{
                  "dimensions":[]
               }
            }
         },
         "granularitySpec":{
            "type":"uniform",
            "segmentGranularity":"DAY",
            "queryGranularity": "none"
         },
         "metricsSpec":[
            {
               "type":"count",
               "name":"count"
            }
         ]
      },
     "ioConfig" : {
       "topic" : "${kafka_topic}",
       "consumerProperties": {
         "bootstrap.servers": "${kafka_broker_list}"
       },
       "useEarliestOffset" : true,
       "taskCount" : ${task_count},
       "replicas" : ${task_replicas},
       "taskDuration": "${task_duration}"
     },
     "tuningConfig" : {
       "type" : "kafka",
       "maxRowsInMemory": ${max_rows_in_memory},
       "buildV9Directly": true
     }
}

Best of luck,

--Ben

--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+unsubscribe@googlegroups.com.
To post to this group, send email to druid...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-user/f5e99d57-91dd-499a-a76c-914e412bd8f9%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--

BENJAMIN VOGAN | Data Platform Team Lead
 
The indispensable app that rewards you for shopping.

Gurdeep Singh

unread,
Nov 14, 2016, 3:30:52 PM11/14/16
to Druid User
Hi Ben,

Do you have a gz file for your distribution with your code in it? I would like to give this a try...


Regards,
Gurdeep
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+...@googlegroups.com.

To post to this group, send email to druid...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-user/f5e99d57-91dd-499a-a76c-914e412bd8f9%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Ben Vogan

unread,
Nov 14, 2016, 4:00:24 PM11/14/16
to druid...@googlegroups.com
The gz is rather large and I do not have a convenient place where I could put it for you to download.  However, you should be able to access the git repo and build it yourself.  You will require git, maven and the JDK.

cd druid
git checkout sk_0.9.1.1
cd distribution
mvn package

That will create a new subdirectory called target in which you can find our distribution: sk_druid-0.9.1.1-sk_bin.tar.gz

Our distribution includes only CDH's hadoop client and the plugins we are using so you might need to switch those up depending on what features you are using.

--Ben

To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+unsubscribe@googlegroups.com.

To post to this group, send email to druid...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Gurdeep Singh

unread,
Nov 15, 2016, 10:21:19 AM11/15/16
to Druid User
How are you handling nested schema? Do you use the flattenSpec?

Ben Vogan

unread,
Nov 15, 2016, 10:42:19 AM11/15/16
to druid...@googlegroups.com
We do not use nested schemata for analytics - they are generally hard to work with.  The data is flattened out upstream if necessary.

--Ben

To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+unsubscribe@googlegroups.com.

To post to this group, send email to druid...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Gurdeep Singh

unread,
Nov 15, 2016, 4:51:47 PM11/15/16
to Druid User
Ben,

I was able to submit a supervisor spec against your distribution and ran into the following error when running against the confluent schema registry....any ideas?

2016-11-15T21:45:02,457 INFO [PinPromo-incremental-persist] io.druid.segment.realtime.appenderator.AppenderatorImpl - Committing metadata[FiniteAppenderatorDriverMetadata{activeSegments={}, lastSegmentIds={}, callerMetadata={nextPartitions=KafkaPartitions{topic='pin-promotions-data', partitionOffsetMap={0=119954, 1=119906, 2=120008, 3=120029, 4=119930, 5=119917, 6=119987, 7=120032, 8=119974, 9=119959}}}}] for sinks[].
2016-11-15T21:45:02,461 INFO [task-runner-0-priority-0] io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver - Persisted pending data in 10ms.
2016-11-15T21:45:02,467 INFO [task-runner-0-priority-0] io.druid.segment.realtime.appenderator.AppenderatorImpl - Shutting down...
2016-11-15T21:45:02,471 ERROR [task-runner-0-priority-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Exception while running task[KafkaIndexTask{id=index_kafka_PinPromo_fccbdd0569600e8_bfficlgh, type=index_kafka, dataSource=PinPromo}]
java.lang.IllegalArgumentException: Provided string is null or empty: 'null'
	at org.schemarepo.RepositoryUtil.validateSchemaOrSubject(RepositoryUtil.java:135) ~[?:?]
	at org.schemarepo.client.Avro1124RESTRepositoryClient.lookup(Avro1124RESTRepositoryClient.java:82) ~[?:?]
	at org.schemarepo.api.TypedSchemaRepository.getSchema(TypedSchemaRepository.java:144) ~[?:?]
	at io.druid.data.input.avro.SchemaRepoBasedAvroBytesDecoder.parse(SchemaRepoBasedAvroBytesDecoder.java:78) ~[?:?]
	at io.druid.data.input.AvroStreamInputRowParser.parse(AvroStreamInputRowParser.java:53) ~[?:?]
	at io.druid.data.input.AvroStreamInputRowParser.parse(AvroStreamInputRowParser.java:33) ~[?:?]
	at io.druid.indexing.kafka.KafkaIndexTask.run(KafkaIndexTask.java:412) ~[?:?]
	at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:436) [druid-indexing-service-0.9.1.1.jar:0.9.1.1]
	at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:408) [druid-indexing-service-0.9.1.1.jar:0.9.1.1]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_101]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_101]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_101]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]
2016-11-15T21:45:02,478 INFO [task-runner-0-priority-0] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_kafka_PinPromo_fccbdd0569600e8_bfficlgh] status changed to [FAILED].
2016-11-15T21:45:02,480 INFO [task-runner-0-priority-0] io.druid.indexing.worker.executor.ExecutorLifecycle - Task completed with status: {
  "id" : "index_kafka_PinPromo_fccbdd0569600e8_bfficlgh",
  "status" : "FAILED",
  "duration" : 1502
}
2016-11-15T21:45:02,483 INFO [main] com.metamx.common.lifecycle.Lifecycle$AnnotationBasedHandler - Invoking stop method[public void io.druid.server.coordination.AbstractDataSegmentAnnouncer.stop()] on object[io.druid.server.coordination.BatchDataSegmentAnnouncer@3a4aadf8].

Ben Vogan

unread,
Nov 15, 2016, 5:09:55 PM11/15/16
to druid...@googlegroups.com
You can see from the stack that it is not using my code, you are still going through the Avro1124RESTRepositoryClient:

at org.schemarepo.client.Avro1124RESTRepositoryClient.lookup(Avro1124RESTRepositoryClient.java:82) ~[?:?]

In your parser spec did you specify the subjectAndIdConverter's type as "confluent"?  If so, I would go back and make sure that you are building from the correct branch (sk_0.9.1.1) and that you have correctly deployed the extension.

--Ben


To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+unsubscribe@googlegroups.com.

To post to this group, send email to druid...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Gurdeep Singh

unread,
Nov 16, 2016, 10:25:50 AM11/16/16
to Druid User
Ben,

Here is a snapshot of my suprevisor spec which I am posting using the following command

curl -X POST -H 'Content-Type: application/json' -d @pinPromo.kafka.json http://localhost:8090/druid/indexer/v1/supervisor

{
  "type": "kafka",
  "dataSchema": {
    "dataSource": "PinPromo",

    "parser": {
      "type": "avro_stream",
      "avroBytesDecoder": {
        "type": "schema_repo",
        "subjectAndIdConverter": {
          "type": "confluent",
          "subject": "pin-promotions-data"

        },
        "schemaRepository": {
          "type": "confluent_client",
          "url": "http://xx.xx.xxx.xxx:8081",

          "identityMapCapacity": 1000
        }
      },
      "parseSpec": {
        "format": "timeAndDims",
        "timestampSpec": {
          "column": "eventDate",

          "format": "auto"
        },
        "dimensionsSpec": {
          "dimensions":[]
        }
      }
    },
    "granularitySpec": {
      "type": "uniform",
      "segmentGranularity": "DAY",
      "queryGranularity": "none"
    },
    "metricsSpec": [
      {
        "type": "count",
        "name": "count"
      }
    ]
  },
......

I did a build from sk_0.9.1.1 and used the extensions generated as result of the build. I had to include the two extensions in my loadlist

druid-avro-extensions
druid-kafka-indexing-service

Which extension would have your code? Let me know what I am doing wrong here....

Regards
Gurdeep

Ben Vogan

unread,
Nov 16, 2016, 11:49:53 AM11/16/16
to druid...@googlegroups.com
I am not sure what is wrong.  The configuration looks fine.  Can you unpack the druid-avro-extensions-0.9.1.1.jar file and confirm that there is a confluent directory in there with CachedSchemaRepositoryClientWrapper.class and ConfluentSubjectAndIdConverter.class files?

--Ben

To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+unsubscribe@googlegroups.com.

To post to this group, send email to druid...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Gurdeep Singh

unread,
Nov 16, 2016, 12:59:09 PM11/16/16
to Druid User
The jar in extensions did not had the confluent class.  I followed the instructions you mentioned exactly and did a "mvn package". I got a "sk_druid-0.9.1.1-sk_bin.tar.gz" in the target folder under distribution, used that as my druid distribution. Not sure why the confluent code is not there in this distribution.

Gurdeep Singh

unread,
Nov 21, 2016, 1:03:07 PM11/21/16
to Druid User
Hi Ben,

I was able to get my superivisor spec running using your distribution, but now I am getting the following errors. Do you know where I can update the settings?

2016-11-21T17:57:30,952 INFO [main] io.druid.guice.JsonConfigurator - Loaded class[class io.druid.client.cache.CacheConfig] from props[druid.realtime.cache.] as [io.druid.client.cache.CacheConfig@611a990b]
2016-11-21T17:57:30,953 ERROR [main] io.druid.cli.CliPeon - Error when starting up.  Failing.
com.google.inject.ProvisionException: Guice provision errors:

1) Not enough direct memory.  Please adjust -XX:MaxDirectMemorySize, druid.processing.buffer.sizeBytes, or druid.processing.numThreads: maxDirectMemory[3,817,865,216], memoryNeeded[8,589,934,592] = druid.processing.buffer.sizeBytes[1,073,741,824] * ( druid.processing.numThreads[7] + 1 )
  at io.druid.guice.DruidProcessingModule.getIntermediateResultsPool(DruidProcessingModule.java:108)
  at io.druid.guice.DruidProcessingModule.getIntermediateResultsPool(DruidProcessingModule.java:108)
  while locating io.druid.collections.StupidPool<java.nio.ByteBuffer> annotated with @io.druid.guice.annotations.Global()
    for parameter 1 at io.druid.query.groupby.GroupByQueryEngine.<init>(GroupByQueryEngine.java:79)
  at io.druid.guice.QueryRunnerFactoryModule.configure(QueryRunnerFactoryModule.java:85)
  while locating io.druid.query.groupby.GroupByQueryEngine
    for parameter 0 at io.druid.query.groupby.GroupByQueryRunnerFactory.<init>(GroupByQueryRunnerFactory.java:64)
  at io.druid.guice.QueryRunnerFactoryModule.configure(QueryRunnerFactoryModule.java:82)
  while locating io.druid.query.groupby.GroupByQueryRunnerFactory

Ben Vogan

unread,
Nov 21, 2016, 1:15:33 PM11/21/16
to druid...@googlegroups.com
The error is remarkably informative - you need more memory for your particular workload and configuration and the error message has given you the different variables that you can change.  It looks like the problem presents when performing a query, which is highly dependent on your data set and query patterns.  I'm assuming that this is presenting in the peon's, in which case you need to change their configuration in the middle manager's runtime.properties file (druid.indexer.runner.javaOpts or druid.indexer.runner.javaOptsArray).

To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+unsubscribe@googlegroups.com.

To post to this group, send email to druid...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Gurdeep Singh

unread,
Nov 23, 2016, 12:59:33 PM11/23/16
to Druid User
Ben,

I was able to get the indexing work for avro formats. Thank you for your direction. However my next 2 hurdles are..

1. Flattening a complex nested schema
2. Do you know if your code will work with Secured topics? I have a topic which is accessible only over SSL.

Regards,
Gurdeep

Ben Vogan

unread,
Nov 23, 2016, 3:48:32 PM11/23/16
to druid...@googlegroups.com
There is no support for flattening a nested Avro schema in my code - you would have to add that, or perform the flattening outside of Druid.

I have not tried it with secured topics, but there is nothing in my code specific to Kafka.  The Avro translation piece is separate from communicating with Kafka.

--Ben

To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+unsubscribe@googlegroups.com.

To post to this group, send email to druid...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.
Reply all
Reply to author
Forward
0 new messages