Too many shards (kafka indexing services)

742 views
Skip to first unread message

Fede

unread,
Aug 30, 2016, 3:50:07 AM8/30/16
to Druid User
Hi there!

I'm using the Kafka Indexing service intrduced in the 0.9.1 release. Everything works just fine, but I'm getting an incredible amount of shards per segment (granularity is set to DAY).

I have two tasks running for each dataSource, and I keep getting data from a mysql db with a service that runs every hour to get the latest data. This is creating more than 100 shards per segment, and I think that this is going to lower the query performance a lot.

Could you please explain why is this happening? Can I avoid that the service creates so many shards?

Thank you in advance!

F.

Nikita Salnikov-Tarnovski

unread,
Aug 30, 2016, 9:17:30 AM8/30/16
to Druid User
I have the exact same situation. Kafka indexing service reading from 1 Kafka topic with 5 partitions, have segment granularity "hour" ends up with 10+ shards per hour. Resulting segment size is around 240M, some shards are of size 0 or 8K, some are 24MB. 

And queries have a huge "query/wait/time".
Message has been deleted

David Lim

unread,
Aug 30, 2016, 2:42:31 PM8/30/16
to Druid User
Hey Fede and Nikita,

Kafka indexing service creates a different segment for each Kafka partition in order to ensure deterministic segment generation. The issue is that since Kafka only guarantees message order within a partition and not across multiple partitions, if two replica tasks were to write events from multiple partitions into a single segment each and maxRowsPerSegment was hit and we cut a segment, each of those segments would contain different data depending on the order the Kafka consumer received them. In order to get around the limitation of no order guarantee across a Kafka topic, we have to treat each partition separately.

The other thing to keep in mind is that taskDuration don't align themselves on time boundaries the way that segmentGranularity does. A segmentGranularity of an hour with realtime data will run from the start of the current hour to the end of the current hour, whereas a taskDuration of an hour will run from now until now+1H. This is why Nikita is seeing 10+ shards per hour for 5 partitions and why the segment size is not consistent.

To minimize the number of segments created, use as few Kafka partitions as necessary that still allow you to meet your Kafka/Druid ingestion throughput requirements (since partitions cannot be split across multiple Druid indexing tasks). You can also try increasing taskDuration, but there's tradeoffs to doing that (MMs may need more resources for queries as it's holding onto data longer, in the case of a failure more data will need to be reindexed).

Right now, the best way to handle excessive segments is to run a Hadoop batch job using a dataSource inputSpec to re-index the data from the existing segments into more optimally sized segments. Docs for how to do this are here:

http://druid.io/docs/0.9.1.1/ingestion/update-existing-data.html

In the future, we are looking to add alternate ways to do this that don't require Hadoop.

Hope this helps,
David

Fede

unread,
Aug 31, 2016, 3:31:28 AM8/31/16
to Druid User
Thank you David, 

I'll try the reindexing and see how it works. I'm posting the results later.

php...@private.social

unread,
Aug 9, 2017, 11:48:33 AM8/9/17
to Druid User
Hi David,
I have same issue with kafka indexing service. Do you have some good example on how to run daily/weekly re-index task to keep data optimized? Or is it only manual job? 
Not sure if I understand how re-indexing will work.from the doc... is it possible to re-index data injesting to the same datasource or I will allways result in 2 sources - initial(huge amount of small shards) and optimized.

Arpan Khagram

unread,
Aug 12, 2017, 11:42:09 AM8/12/17
to Druid User
Hi - Yes it is possible to re-index the data of specific datasource and push it back to the same datasource. It will replace the earlier segments with new one and once done your queries will be served by new segments.

Regards,
Arpan Khagram

PHP Dev

unread,
Aug 12, 2017, 11:52:27 AM8/12/17
to druid...@googlegroups.com
Hi 
Thanks. Where can I find example of such operation?  Some queries examples would be really helpful.

12 Авг 2017 г. 18:42 пользователь "Arpan Khagram" <arpan.kh...@gmail.com> написал:
--
You received this message because you are subscribed to a topic in the Google Groups "Druid User" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/druid-user/sAB5XQ_KURY/unsubscribe.
To unsubscribe from this group and all its topics, send an email to druid-user+unsubscribe@googlegroups.com.
To post to this group, send email to druid...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-user/d9cb8034-34cf-438e-943b-231064eeb264%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Arpan Khagram

unread,
Aug 21, 2017, 9:15:23 AM8/21/17
to Druid User
Hi - There are 2 ways you can do it - one is through indexing firehose service and another one is through hadoop batch job.

Firehose indexing works and should only used for prototype or small amount of data. If you are willing to re-index large data set than you should go for hadoop batch job. Shall provide you with some sample task JSON.

Regards,
Arpan Khagram
To unsubscribe from this group and all its topics, send an email to druid-user+...@googlegroups.com.

php...@private.social

unread,
Aug 29, 2017, 6:21:35 AM8/29/17
to Druid User
Thanks, Arpan.

Here is working example of reindexing firehose job in case someone need it.

{
    "type": "index",
    "spec": {
        "dataSchema": {
            "parser": {
                "type": "string",
                "parseSpec": {
                    "format": "json",
                    "timestampSpec": {
                        "column": "timestamp",
                        "format": "auto"
                    },
                    "dimensionsSpec": {
                        "dimensions": [
                            "dimension1",
                            "dimensionN"
                        ],
                        "dimensionExclusions": [],
                        "spatialDimensions": []
                    }
                }
            },
            "granularitySpec": {
                "type": "uniform",
                "segmentGranularity": "WEEK",
                "queryGranularity": "HOUR",
                "rollup": true,
                "intervals": [
                    "2017-07-03T00:00:00.000Z\/2017-08-28T00:00:00.000Z"
                ]
            },
            "dataSource": "dataSourceName",
            "metricsSpec": [
                {
                    "type": "longSum",
                    "name": "metric1",
                    "fieldName": "Metric1"
                },
                {
                    "type": "longSum",
                    "name": "metricN",
                    "fieldName": "MetricN"
                }
            ]
        },
        "ioConfig": {
            "type": "index",
            "firehose": {
                "type": "ingestSegment",
                "dataSource": "dataSourceName",
                "interval": "2017-07-03T00:00:00.000Z\/2017-08-28T00:00:00.000Z"
            }
        },
        "tuningConfig": {
            "type": "index",
            "targetPartitionSize": 5000000,
            "maxRowsInMemory": 75000
        }
    }
}
Reply all
Reply to author
Forward
0 new messages