'taskDuration' in Kafka Indexing Service causes more problems than it solves

217 views
Skip to first unread message

Prashant Deva

unread,
Apr 4, 2018, 11:26:30 PM4/4/18
to Druid Development
I believe ‘taskDuration’  field for Kafka Indexing Service does more harm than good. Here is why:

Lets assume that I am creating hourly segments ("segmentGranularity": "HOUR").


If I am ingesting via Kafka, I typically want to persist a segment:

1. When segmentGranularity is reached. That is, every hour.
Thus I want my segments to look like :
01:00-02:00
02:00-03:00


2. If a segment is getting too large, then I want to split it even if the segment granularity hasn't been reached. "maxRowsPerSegment" achieves this.


Now there is the `taskDuration` field. I set it to "taskDuration": "PT1H" assuming this will result in the segments I want above. However, turns out that is not the case!

Unless I submit my supervisor spec at EXACTLY 01:00, the segments will now be created every hour from when I SUBMITTED my supervisor spec.

So instead of, segments like:
01:00-02:00
02:00-03:00

I now get segments like:
01:15-02:00
02:00-02:15
02:15-03:00

The segments are now broken not just by segment granularity but also by 'taskDuration'.
This is not what I wanted! Now the segments for every hour are split into atleast 2 non-optimal sized segments.


The segments created shouldn't be dependent on when the supervisor spec was submitted.
The 'taskDuration' field thus is not only not necessary for Kafka Indexing Service, it actually results in unwanted behavior.


Note: I tried cross-posting to apache mailing list. However, the druid page on apache website (http://incubator.apache.org/projects/druid.html) has no instructions on how to actually view or subscribe to the list.



Prashant Deva

unread,
Apr 5, 2018, 1:35:48 PM4/5/18
to Parag Jain, Druid Development
setting to 3x doesnt solve the problem. it just delays it. in this case the 3rd segment created would be split in uneven parts.



On Thu, Apr 5, 2018 at 10:31 AM Parag Jain <pja...@oath.com> wrote:
You can keep the task duration to something like 3-5X of segment granularity to avoid this problem. In ideal world when task logs can also be uploaded incrementally to some long term store, tasks should actually run forever and will get replaced only when they fail.


--
You received this message because you are subscribed to the Google Groups "Druid Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-developm...@googlegroups.com.
--
Prashant

Jihoon Son

unread,
Apr 5, 2018, 4:06:41 PM4/5/18
to Druid Development, d...@druid.incubator.apache.org
Hi Prashant,

Kafka indexing service is still an experimental feature and has been being actively developed. There might be some issues because of the fast and active development. I think 'taskDuration' is one of such issues.

When kafka indexing service was first introduced, it was not capable of incremental handoff and 'taskDuration' was the only way to publish segments generated by kafka index tasks.

Incremental handoff of kafka index tasks is fairly new feature introduced in 0.12.0 which is our latest release. So, there might be some more issues including the one you pointed out. 

The issue of 'taskDuration' might be solved by running kafka index tasks forever instead of running new tasks per 'taskDuration'. To do so, as Parag pointed out, some issues should be addressed first like incremental uploading task logs. Do you have any idea to fix this issue? I will happily help you.

Jihoon

2018년 4월 5일 (목) 오전 10:35, Prashant Deva <prasha...@gmail.com>님이 작성:

David Lim

unread,
Apr 5, 2018, 6:24:56 PM4/5/18
to Druid Development
Hey Prashant,

The 'taskDuration' configuration is necessary because otherwise the Kafka indexing task would not know when it should stop and publish the segments. Running without this configuration could only be viable if you were working with a stream that never had late data for any reason which is something that systems have to deal with. It would also prevent you from reading historical data which is a supported scenario for the Kafka indexing service.

As an example for if you didn't have it, let's say you have a task that is receiving events that all fall into the 2018-04-01T01:00/2018-04-01T02:00 segment. How would you know when that segment is done and the task can complete? Do you wait for the first event with a timestamp > 2018-04-01T02:00? Do you close the segment X minutes after the server time moves into the next hour? What happens if you receive data that falls into that segment later on? Do you create a new task to read that message, and when does that task complete?

Similarly for historical data, if we had a stream that contained events with timestamps from a year ago, how would the task know that it is done?

It is these kind of scenarios why the lifetime of a task can't be coupled to event timestamps (which is what determines the segment intervals).

Like Parag and Jihoon mentioned, incremental handoff should help, but you can still get sub-optimal segments if you have late data. Automated compaction (https://github.com/druid-io/druid/pull/5102) should help with this.

Prashant Deva

unread,
Apr 5, 2018, 10:11:54 PM4/5/18
to Druid Development
Jihoon,

> some issues should be addressed first like incremental uploading task logs
why not persist the logs to kafka topics. each task can create a topic that it can stream logs to. the logs can then be transferred to deep storage periodically after each segment is persisted.

David,

The 'windowPeriod' in realtime nodes solved this problems.
'taskDuration' feels like its solving the same issue but in a worse manner.

happy to be corrected if i am wrong.

Prashant

Gian Merlino

unread,
Apr 6, 2018, 2:46:20 AM4/6/18
to druid-de...@googlegroups.com, d...@druid.apache.org
Hi Prashant,

Fwiw, even in an ideal world where tasks ran forever (which I do think we want to get to), we may still get imperfectly partitioned segments. It's because unlike "classic" realtime ingestion, the exactly-once version needs to publish all data up to a certain set of Kafka offsets into Druid in one transaction. In a world where messages arrive out of order, there is never a perfect time to do this. It's why we are generally thinking that automated compaction is an important feature to have.

About the mailing lists: we are still setting up the ASF Druid stuff and the mailing list was the first thing to be migrated. The web page isn't there yet. To subscribe to the dev list, send a mail to dev-su...@druid.apache.org. I've cross posted this thread there.

Gian

To unsubscribe from this group and stop receiving emails from it, send an email to druid-development+unsubscribe@googlegroups.com.
To post to this group, send email to druid-development@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-development/d5b77692-bfe1-40cf-82d5-bd48e07610bc%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages