Help Optimizing Kafka Indexing

244 views
Skip to first unread message

malachi

unread,
Mar 31, 2021, 10:45:51 AM3/31/21
to Druid User
Hello fellow Druids,
I am scoping Druid out for my company and am having difficulties reducing my ingestion times. I'm new to Druid and I haven't had much luck with the documentation I've went through. Can any of you guys give me any tips for how to reduce ingestion time? I'm shooting for under 1 second ingestion time for each topic. I'm currently seeing a bottleneck that is causing my ingestion times to increase over time. Thanks for any help!

My Cluster:
Data Server: 72 core Intel Xeon at 2.2Ghz, 300 gb ram
Master Server: 40 core Intel Xeon at 2.2ghz, 100gb ram
Query Server: 72 core Intel Xeon at 2.2Ghz, 300 gb ram

My middle manager config:
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

druid.service=druid/middleManager
druid.plaintextPort=8091

# Number of tasks per middleManager
druid.worker.capacity=70

# Task launch parameters
druid.indexer.runner.javaOpts=-server -Xms8g -Xmx8g -XX:MaxDirectMemorySize=16g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+ExitOnOutOfMemoryError -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
druid.indexer.task.baseTaskDir=var/druid/task

# HTTP server threads
druid.server.http.numThreads=60

# Processing threads and buffers on Peons
druid.indexer.fork.property.druid.processing.numMergeBuffers=2
druid.indexer.fork.property.druid.processing.buffer.sizeBytes=100MiB
druid.indexer.fork.property.druid.processing.numThreads=70

# Hadoop indexing
druid.indexer.task.hadoopWorkingPath=var/druid/hadoop-tmp

My Data:
Kafka indexing with different topics. Each topic has 4 partitions. 
Topic 1: 384,000 messages a second, 48 bits per message
Topic 2: 3072 message a second, 8232 bites per message

Schema for the topics:
Topic 1:
{
"type": "kafka",
"spec": {
"dataSchema": {
"dataSource": "DruidStaticData3",
"timestampSpec": null,
"dimensionsSpec": null,
"metricsSpec": [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "SECOND",
"queryGranularity": {
"type": "none"
},
"rollup": false,
"intervals": null
},
"transformSpec": {
"filter": null,
"transforms": []
},
"parser": {
"type": "avro_stream",
"avroBytesDecoder": {
"type": "schema_registry",
"url": "XXXX:8081"
},
"parseSpec": {
"format": "avro",
"timestampSpec": {
"column": "time",
"format": "auto"
}
},
"dimensionsSpec": {
"dimensions": [
{
"name": "id",
"type": "string"
},
{
"name": "value",
"type": "float"
}
]
}
}
},
"partitionsSpec": {
"type": "hashed",
"targetRowsPerSegment": 5000000
},
"ioConfig": {
"topic": "DruidStaticData",
"inputFormat": null,
"replicas": 1,
"taskCount": 4,
"taskDuration": "PT3600S",
"consumerProperties": {
"bootstrap.servers": "XXXX:9092"
},
"pollTimeout": 100,
"startDelay": "PT5S",
"period": "PT30S",
"useEarliestOffset": false,
"completionTimeout": "PT1800S",
"lateMessageRejectionPeriod": null,
"earlyMessageRejectionPeriod": null,
"lateMessageRejectionStartDateTime": null,
"stream": "DruidStaticData",
"useEarliestSequenceNumber": false,
"type": "kafka"
},
"tuningConfig": {
"type": "kafka",
"maxRowsInMemory": 1000000,
"maxBytesInMemory": 0,
"maxRowsPerSegment": 5000000,
"maxTotalRows": null,
"intermediatePersistPeriod": "PT10M",
"basePersistDirectory": "/home/oper/apache-druid-0.20.1/var/tmp/druid-realtime-persist2701612987887319950",
"maxPendingPersists": 0,
"indexSpec": {
"bitmap": {
"type": "roaring",
"compressRunOnSerialization": true
},
"dimensionCompression": "lz4",
"metricCompression": "lz4",
"longEncoding": "longs",
"segmentLoader": null
},
"indexSpecForIntermediatePersists": {
"bitmap": {
"type": "roaring",
"compressRunOnSerialization": true
},
"dimensionCompression": "lz4",
"metricCompression": "lz4",
"longEncoding": "longs",
"segmentLoader": null
},
"buildV9Directly": true,
"reportParseExceptions": false,
"handoffConditionTimeout": 0,
"resetOffsetAutomatically": false,
"segmentWriteOutMediumFactory": null,
"workerThreads": null,
"chatThreads": null,
"chatRetries": 8,
"httpTimeout": "PT10S",
"shutdownTimeout": "PT80S",
"offsetFetchPeriod": "PT30S",
"intermediateHandoffPeriod": "P2147483647D",
"logParseExceptions": true,
"maxParseExceptions": 2147483647,
"maxSavedParseExceptions": 0,
"skipSequenceNumberAvailabilityCheck": false,
"repartitionTransitionDuration": "PT120S"
}
}
}

Topic 2:
{
"type": "kafka",
"spec": {
"dataSchema": {
"dataSource": "DruidStaticData3",
"timestampSpec": null,
"dimensionsSpec": null,
"metricsSpec": [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "SECOND",
"queryGranularity": {
"type": "none"
},
"rollup": false,
"intervals": null
},
"transformSpec": {
"filter": null,
"transforms": []
},
"parser": {
"type": "avro_stream",
"avroBytesDecoder": {
"type": "schema_registry",
"url": "XXXX:8081"
},
"parseSpec": {
"format": "avro",
"timestampSpec": {
"column": "time",
"format": "auto"
}
},
"dimensionsSpec": {
"dimensions": [
{
"name": "id",
"type": "string"
},
{
"name": "value",
"type": "float"
}
]
}
}
},
"partitionsSpec": {
"type": "hashed",
"targetRowsPerSegment": 5000000
},
"ioConfig": {
"topic": "DruidStaticData",
"inputFormat": null,
"replicas": 1,
"taskCount": 4,
"taskDuration": "PT3600S",
"consumerProperties": {
"bootstrap.servers": "XXXX:9092"
},
"pollTimeout": 100,
"startDelay": "PT5S",
"period": "PT30S",
"useEarliestOffset": false,
"completionTimeout": "PT1800S",
"lateMessageRejectionPeriod": null,
"earlyMessageRejectionPeriod": null,
"lateMessageRejectionStartDateTime": null,
"stream": "DruidStaticData",
"useEarliestSequenceNumber": false,
"type": "kafka"
},
"tuningConfig": {
"type": "kafka",
"maxRowsInMemory": 1000000,
"maxBytesInMemory": 0,
"maxRowsPerSegment": 5000000,
"maxTotalRows": null,
"intermediatePersistPeriod": "PT10M",
"basePersistDirectory": "/home/oper/apache-druid-0.20.1/var/tmp/druid-realtime-persist2701612987887319950",
"maxPendingPersists": 0,
"indexSpec": {
"bitmap": {
"type": "roaring",
"compressRunOnSerialization": true
},
"dimensionCompression": "lz4",
"metricCompression": "lz4",
"longEncoding": "longs",
"segmentLoader": null
},
"indexSpecForIntermediatePersists": {
"bitmap": {
"type": "roaring",
"compressRunOnSerialization": true
},
"dimensionCompression": "lz4",
"metricCompression": "lz4",
"longEncoding": "longs",
"segmentLoader": null
},
"buildV9Directly": true,
"reportParseExceptions": false,
"handoffConditionTimeout": 0,
"resetOffsetAutomatically": false,
"segmentWriteOutMediumFactory": null,
"workerThreads": null,
"chatThreads": null,
"chatRetries": 8,
"httpTimeout": "PT10S",
"shutdownTimeout": "PT80S",
"offsetFetchPeriod": "PT30S",
"intermediateHandoffPeriod": "P2147483647D",
"logParseExceptions": true,
"maxParseExceptions": 2147483647,
"maxSavedParseExceptions": 0,
"skipSequenceNumberAvailabilityCheck": false,
"repartitionTransitionDuration": "PT120S"
}
}
}


Ben Krug

unread,
Mar 31, 2021, 5:42:17 PM3/31/21
to druid...@googlegroups.com
I'm not sure about the druid configuration, but my first thought was to ask whether you've tried increasing the number of kafka partitions (if possible), so you can get more parallelism?  I think each partition might get a single reader/ingestor task (off the top of my head, could be wrong).

But that's without really analyzing the druid config, and without knowing exactly where the bottleneck is.  Just a guess you might try.  If not that, it's back to the druid configuration and setup.

--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-user/4b1d9640-ca14-4cec-b97f-f57a89d2b27dn%40googlegroups.com.

Ben Krug

unread,
Mar 31, 2021, 5:43:52 PM3/31/21
to druid...@googlegroups.com
I forgot to add, if you do try increasing the numbers of partitions, with your cores, you might well try, eg, 16 partitions per topic, if that is possible.  (Not sure of your partitioning scheme, and whether it's open to change.)

malachi gierzak

unread,
Apr 1, 2021, 7:18:51 AM4/1/21
to druid...@googlegroups.com
Thank you Ben,
Is it possible to create more parallelism without modifying the kafka topics? Possibly by using partitioning or segmentation in my Druid config?

You received this message because you are subscribed to a topic in the Google Groups "Druid User" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/druid-user/OuEHMQj7BwA/unsubscribe.
To unsubscribe from this group and all its topics, send an email to druid-user+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-user/CAP%2BetTs-%3DnrA3zLMqdX%2BpUCudang5muuqqUg-1bXVmnjN9SsSw%40mail.gmail.com.

malachi gierzak

unread,
Apr 1, 2021, 12:32:48 PM4/1/21
to druid...@googlegroups.com
I tried using 16 partitions and it worked wonderfully for most of my data. It looks like I'll need to scale my data server to handle the larger topics. Thank you for the help!

Ben Krug

unread,
Apr 1, 2021, 12:56:35 PM4/1/21
to druid...@googlegroups.com
I don't think it's possible, but I could be wrong.
Flying blind here, but with the size of your RAM, you might try, eg

druid.indexer.fork.property.druid.processing.buffer.sizeBytes=1000000000

(eg, set it to 1G as a test) and see whether that makes a difference.  It's a shot in the dark, though.  If it doesn't help, revert it.  If it does, start thinking about how many concurrent tasks and queries you want to plan for, and whether RAM might get exhausted - maybe try dialing it down to find a sweet spot.

Ben Krug

unread,
Apr 1, 2021, 12:57:23 PM4/1/21
to druid...@googlegroups.com
Oh, our notes crossed paths - glad to hear that increasing partitions worked!  That's really the best approach to start with.

malachi gierzak

unread,
Apr 1, 2021, 1:36:44 PM4/1/21
to druid...@googlegroups.com
I've tried that before and it actually increased my ingestion times. I have some larger topics that are 2 to 4 times as much as the topic 2 I posted above. I think that scaling out my data servers and increasing partition counts for my larger topics will be the best way to get the results I need. Thank you for your help Ben!

Peter Marshall

unread,
Apr 16, 2021, 7:24:58 AM4/16/21
to Druid User
late to the party again... but just noting it's good to have a nice ratio between partitions and workers - something that divides well - like if you have 8 partitions, have 4 taskCount - that will put two partitions in each worker. (:P

Peter Marshall

unread,
Apr 16, 2021, 7:25:56 AM4/16/21
to Druid User
oh and it can also help to think about 1 worker doing, like, 10,000 events per second.  Though the only way to really see throughput is to look at the metrics that get emitted.  then you can kinda work backwards to work out how many workers you need to have running to keep up with the velocity of the data.
Reply all
Reply to author
Forward
0 new messages