Hello fellow Druids,
I am scoping Druid out for my company and am having difficulties reducing my ingestion times. I'm new to Druid and I haven't had much luck with the documentation I've went through. Can any of you guys give me any tips for how to reduce ingestion time? I'm shooting for under 1 second ingestion time for each topic. I'm currently seeing a bottleneck that is causing my ingestion times to increase over time. Thanks for any help!
My Cluster:
Data Server: 72 core Intel Xeon at 2.2Ghz, 300 gb ram
Master Server: 40 core Intel Xeon at 2.2ghz, 100gb ram
Query Server: 72 core Intel Xeon at 2.2Ghz, 300 gb ram
My middle manager config:
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
druid.service=druid/middleManager
druid.plaintextPort=8091
# Number of tasks per middleManager
druid.worker.capacity=70
# Task launch parameters
druid.indexer.runner.javaOpts=-server -Xms8g -Xmx8g -XX:MaxDirectMemorySize=16g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+ExitOnOutOfMemoryError -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
druid.indexer.task.baseTaskDir=var/druid/task
# HTTP server threads
druid.server.http.numThreads=60
# Processing threads and buffers on Peons
druid.indexer.fork.property.druid.processing.numMergeBuffers=2
druid.indexer.fork.property.druid.processing.buffer.sizeBytes=100MiB
druid.indexer.fork.property.druid.processing.numThreads=70
# Hadoop indexing
druid.indexer.task.hadoopWorkingPath=var/druid/hadoop-tmp
My Data:
Kafka indexing with different topics. Each topic has 4 partitions.
Topic 1: 384,000 messages a second, 48 bits per message
Topic 2: 3072 message a second, 8232 bites per message
Schema for the topics:
Topic 1:
{
"type": "kafka",
"spec": {
"dataSchema": {
"dataSource": "DruidStaticData3",
"timestampSpec": null,
"dimensionsSpec": null,
"metricsSpec": [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "SECOND",
"queryGranularity": {
"type": "none"
},
"rollup": false,
"intervals": null
},
"transformSpec": {
"filter": null,
"transforms": []
},
"parser": {
"type": "avro_stream",
"avroBytesDecoder": {
"type": "schema_registry",
"url": "XXXX:8081"
},
"parseSpec": {
"format": "avro",
"timestampSpec": {
"column": "time",
"format": "auto"
}
},
"dimensionsSpec": {
"dimensions": [
{
"name": "id",
"type": "string"
},
{
"name": "value",
"type": "float"
}
]
}
}
},
"partitionsSpec": {
"type": "hashed",
"targetRowsPerSegment": 5000000
},
"ioConfig": {
"topic": "DruidStaticData",
"inputFormat": null,
"replicas": 1,
"taskCount": 4,
"taskDuration": "PT3600S",
"consumerProperties": {
"bootstrap.servers": "XXXX:9092"
},
"pollTimeout": 100,
"startDelay": "PT5S",
"period": "PT30S",
"useEarliestOffset": false,
"completionTimeout": "PT1800S",
"lateMessageRejectionPeriod": null,
"earlyMessageRejectionPeriod": null,
"lateMessageRejectionStartDateTime": null,
"stream": "DruidStaticData",
"useEarliestSequenceNumber": false,
"type": "kafka"
},
"tuningConfig": {
"type": "kafka",
"maxRowsInMemory": 1000000,
"maxBytesInMemory": 0,
"maxRowsPerSegment": 5000000,
"maxTotalRows": null,
"intermediatePersistPeriod": "PT10M",
"basePersistDirectory": "/home/oper/apache-druid-0.20.1/var/tmp/druid-realtime-persist2701612987887319950",
"maxPendingPersists": 0,
"indexSpec": {
"bitmap": {
"type": "roaring",
"compressRunOnSerialization": true
},
"dimensionCompression": "lz4",
"metricCompression": "lz4",
"longEncoding": "longs",
"segmentLoader": null
},
"indexSpecForIntermediatePersists": {
"bitmap": {
"type": "roaring",
"compressRunOnSerialization": true
},
"dimensionCompression": "lz4",
"metricCompression": "lz4",
"longEncoding": "longs",
"segmentLoader": null
},
"buildV9Directly": true,
"reportParseExceptions": false,
"handoffConditionTimeout": 0,
"resetOffsetAutomatically": false,
"segmentWriteOutMediumFactory": null,
"workerThreads": null,
"chatThreads": null,
"chatRetries": 8,
"httpTimeout": "PT10S",
"shutdownTimeout": "PT80S",
"offsetFetchPeriod": "PT30S",
"intermediateHandoffPeriod": "P2147483647D",
"logParseExceptions": true,
"maxParseExceptions":
2147483647,
"maxSavedParseExceptions": 0,
"skipSequenceNumberAvailabilityCheck": false,
"repartitionTransitionDuration": "PT120S"
}
}
}
Topic 2:
{
"type": "kafka",
"spec": {
"dataSchema": {
"dataSource": "DruidStaticData3",
"timestampSpec": null,
"dimensionsSpec": null,
"metricsSpec": [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "SECOND",
"queryGranularity": {
"type": "none"
},
"rollup": false,
"intervals": null
},
"transformSpec": {
"filter": null,
"transforms": []
},
"parser": {
"type": "avro_stream",
"avroBytesDecoder": {
"type": "schema_registry",
"url": "XXXX:8081"
},
"parseSpec": {
"format": "avro",
"timestampSpec": {
"column": "time",
"format": "auto"
}
},
"dimensionsSpec": {
"dimensions": [
{
"name": "id",
"type": "string"
},
{
"name": "value",
"type": "float"
}
]
}
}
},
"partitionsSpec": {
"type": "hashed",
"targetRowsPerSegment": 5000000
},
"ioConfig": {
"topic": "DruidStaticData",
"inputFormat": null,
"replicas": 1,
"taskCount": 4,
"taskDuration": "PT3600S",
"consumerProperties": {
"bootstrap.servers": "XXXX:9092"
},
"pollTimeout": 100,
"startDelay": "PT5S",
"period": "PT30S",
"useEarliestOffset": false,
"completionTimeout": "PT1800S",
"lateMessageRejectionPeriod": null,
"earlyMessageRejectionPeriod": null,
"lateMessageRejectionStartDateTime": null,
"stream": "DruidStaticData",
"useEarliestSequenceNumber": false,
"type": "kafka"
},
"tuningConfig": {
"type": "kafka",
"maxRowsInMemory": 1000000,
"maxBytesInMemory": 0,
"maxRowsPerSegment": 5000000,
"maxTotalRows": null,
"intermediatePersistPeriod": "PT10M",
"basePersistDirectory": "/home/oper/apache-druid-0.20.1/var/tmp/druid-realtime-persist2701612987887319950",
"maxPendingPersists": 0,
"indexSpec": {
"bitmap": {
"type": "roaring",
"compressRunOnSerialization": true
},
"dimensionCompression": "lz4",
"metricCompression": "lz4",
"longEncoding": "longs",
"segmentLoader": null
},
"indexSpecForIntermediatePersists": {
"bitmap": {
"type": "roaring",
"compressRunOnSerialization": true
},
"dimensionCompression": "lz4",
"metricCompression": "lz4",
"longEncoding": "longs",
"segmentLoader": null
},
"buildV9Directly": true,
"reportParseExceptions": false,
"handoffConditionTimeout": 0,
"resetOffsetAutomatically": false,
"segmentWriteOutMediumFactory": null,
"workerThreads": null,
"chatThreads": null,
"chatRetries": 8,
"httpTimeout": "PT10S",
"shutdownTimeout": "PT80S",
"offsetFetchPeriod": "PT30S",
"intermediateHandoffPeriod": "P2147483647D",
"logParseExceptions": true,
"maxParseExceptions":
2147483647,
"maxSavedParseExceptions": 0,
"skipSequenceNumberAvailabilityCheck": false,
"repartitionTransitionDuration": "PT120S"
}
}
}