Druid Kafka Batch Ingestion

373 views
Skip to first unread message

sigo...@gmail.com

unread,
Aug 30, 2016, 4:50:03 PM8/30/16
to Druid User
Is there a way to do Kafka batch ingestion in Druid staring form a specific offset or partition for a topic? All I see right now is real time ingestion from Kafka.

Gian Merlino

unread,
Aug 30, 2016, 4:54:04 PM8/30/16
to druid...@googlegroups.com
The new Kafka indexing service (https://imply.io/docs/latest/tutorial-kafka-indexing-service.html) can ingest historical data. If you set "useEarliestOffset" : false then it will read from the beginning of the topic and ingest everything.

Gian

On Tue, Aug 30, 2016 at 1:50 PM, <sigo...@gmail.com> wrote:
Is there a way to do Kafka batch ingestion in Druid staring form a specific offset or partition for a topic? All I see right now is real time ingestion from Kafka.

--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+unsubscribe@googlegroups.com.
To post to this group, send email to druid...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-user/0ab6b0cf-4a1d-42ca-bc1f-5ef5ac7d6b2e%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

sigo...@gmail.com

unread,
Aug 30, 2016, 5:42:44 PM8/30/16
to Druid User
Hello Gian, Thank you so much for the quick reply. 

I used this as my kafka indexing spec. But I am currently getting this error: "error":"Could not resolve type id 'kafka' into a subtype of [simple type, class io.druid.indexing.overlord.supervisor.SupervisorSpec]. Do you know the cause for this error? Do I need to add some dependency in the druid.extensions.loadList in the common.runtime file?

Also do you know if I can put PT5M in the segmentGranularity for the granularitySpec? Or does it have to be "HOUR", "MINUTE", "SECONDS", etc.

{
  "type": "kafka",
  "dataSchema": {
    "dataSource": "pageviews-kafka",
    "parser": {
      "type": "string",
      "parseSpec": {
        "format": "json",
        "timestampSpec": {
          "column": "time",
          "format": "auto"
        },
        "dimensionsSpec": {
          "dimensions": ["url", "user"]
        }
      }
    },
    "metricsSpec": [
      {"name": "views", "type": "count"},
      {"name": "latencyMs", "fieldName": "latencyMs", "type": "doubleSum"}
    ],
    "granularitySpec": {
      "type": "uniform",
      "segmentGranularity": "PT5M",
      "queryGranularity": "NONE"
    }
  },
  "ioConfig": {
    "topic": "pageviews",
    "consumerProperties": {
      "bootstrap.servers": "localhost:9092"
    },
    "useEarliestOffset" : "false",
    "taskCount": 1,

    "replicas": 1,

    "taskDuration": "PT5M"

} }

On Tuesday, August 30, 2016 at 1:54:04 PM UTC-7, Gian Merlino wrote:
The new Kafka indexing service (https://imply.io/docs/latest/tutorial-kafka-indexing-service.html) can ingest historical data. If you set "useEarliestOffset" : false then it will read from the beginning of the topic and ingest everything.

Gian

On Tue, Aug 30, 2016 at 1:50 PM, <sigo...@gmail.com> wrote:
Is there a way to do Kafka batch ingestion in Druid staring form a specific offset or partition for a topic? All I see right now is real time ingestion from Kafka.

--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+...@googlegroups.com.

Gian Merlino

unread,
Aug 30, 2016, 5:53:31 PM8/30/16
to druid...@googlegroups.com
You do need to include the druid-kafka-indexing-service extension. And yeah the segmentGranularity does need to be one of the predefined options.

Gian

To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+unsubscribe@googlegroups.com.

To post to this group, send email to druid...@googlegroups.com.

sigo...@gmail.com

unread,
Aug 30, 2016, 5:56:54 PM8/30/16
to Druid User
So this is how my extension.loadList looks like in the common.runtime.properties file:

druid.extensions.loadList=["druid-hdfs-storage", "mysql-metadata-storage", "druid-kafka-eight", "druid-kafka-indexing-service"]


I am still getting the same error even when I included the "druid-kafka-indexing-service" extension?


Gian

Reply all
Reply to author
Forward
Message has been deleted
Message has been deleted
Message has been deleted
0 new messages