Hey Fede and Nikita,
Kafka indexing service creates a different
segment for each Kafka partition in order to ensure deterministic
segment generation. The issue is that since Kafka only guarantees
message order within a partition and not across multiple partitions, if
two replica tasks were to write events from multiple partitions into a
single segment each and maxRowsPerSegment was hit and we cut a segment,
each of those segments would contain different data depending on the
order the Kafka consumer received them. In order to get around the
limitation of no order guarantee across a Kafka topic, we have to treat
each partition separately.
The other thing to keep in mind is
that taskDuration don't align themselves on time boundaries the way that
segmentGranularity does. A segmentGranularity of an hour with realtime
data will run from the start of the current hour to the end of the
current hour, whereas a taskDuration of an hour will run from now until
now+1H. This is why Nikita is seeing 10+ shards per hour for 5
partitions and why the segment size is not consistent.
To
minimize the number of segments created, use as few Kafka partitions as
necessary that still allow you to meet your Kafka/Druid ingestion
throughput requirements (since partitions cannot be split across
multiple Druid indexing tasks). You can also try increasing
taskDuration, but there's tradeoffs to doing that (MMs may need more
resources for queries as it's holding onto data longer, in the case of a
failure more data will need to be reindexed).
Right now, the
best way to handle excessive segments is to run a Hadoop batch job using a
dataSource inputSpec to re-index the data from the existing segments
into more optimally sized segments. Docs for how to do this are here:
http://druid.io/docs/0.9.1.1/ingestion/update-existing-data.htmlIn the future, we are looking to add alternate ways to do this that don't require Hadoop.
Hope this helps,
David