Ingestion speed on large amounts of sparsely timestamped data.

414 views
Skip to first unread message

Karol Woźniak

unread,
May 16, 2016, 8:12:12 PM5/16/16
to Druid User
Hi,

We have a system that sends us about 50.000.000 unique events every 15 minutes (all have the same timestamp). That should be considered as after roll-up value, in Druid terms. We're investigating the best way to get this data into Druid.

At first, we tried using the "index" task, but found out it was way too slow for this (it took more than 1 hour to ingest... 1 hour).
Then we turned to submitting a "index_realtime" task and pushing data through HTTP. That was much better, as we went down to ~30min per hour, but still too slow. Surprisingly, sending with more connections in parallel does not seem to offer any significant speedup.

We're doing the tests on a Intel i7-6700 machine (8 cores) with 32GB of RAM and two SSD drives in RAID0. When the ingestion is taking place, we see that the load avg. is about 4 and there's at most ~100MB/s IO, so there seems to still be plenty of power to use. Is there any way to tune a single "index_realtime" task to use more resources (I will attach the Overlord configs, etc. below)? Or is spawning more tasks in parallel the only way? The latter is cumbersome, as a single task working is currently using >10GB of RAM, so we're pretty short on that.

One other problem we've been facing is that sometimes there are errors saying "Tried to write [<some_number>] bytes, which is too much". As far as we understand, that's because there are too many rows on the same timestamp. We think that sharding can help us here, is this correct?

Any general advices on handling such type of data?


Let me know if we can supply any more information.

Thanks!

Gian Merlino

unread,
May 16, 2016, 8:17:14 PM5/16/16
to druid...@googlegroups.com
Hey Karol,

Parallel tasks is the way to get better use of resources. Each task has only a single thread devoted to indexing. If you're using Tranquility then you can set the parallelism through "task.partitions" (see https://github.com/druid-io/tranquility/blob/master/docs/configuration.md).

10GB of RAM per task sounds like a lot, you should be able to get by with a GB or two of heap and a GB or two offheap. That 8 core, 32GB machine should be able to run 4–8 tasks.

That should also help with the "wrote too many bytes" errors; those happen when output columns are too large (no single column can be more than 2GB).

Gian

--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+...@googlegroups.com.
To post to this group, send email to druid...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-user/d2dc3e16-3bcd-4504-8cb0-68eaf9530d6a%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Karol Woźniak

unread,
May 23, 2016, 9:35:55 AM5/23/16
to druid...@googlegroups.com
On 17 May 2016 at 02:17, Gian Merlino <gi...@imply.io> wrote:
Hey Karol,

Parallel tasks is the way to get better use of resources. Each task has only a single thread devoted to indexing. If you're using Tranquility then you can set the parallelism through "task.partitions" (see https://github.com/druid-io/tranquility/blob/master/docs/configuration.md).

10GB of RAM per task sounds like a lot, you should be able to get by with a GB or two of heap and a GB or two offheap. That 8 core, 32GB machine should be able to run 4–8 tasks.

That should also help with the "wrote too many bytes" errors; those happen when output columns are too large (no single column can be more than 2GB).


Thanks Gian for the answer. Sorry for long delay, we've been investigating based on your response.

It seems that java is just hungry for RAM [surprise ;-)!], if we put a lower limit, it is still able to keep up without using that much memory. We were able to run 4 tasks on the above machine and cut the ingestion time to about 6-7 hours per day, which is a significant improvement.

Still looking for more optimizations, if anyone has any ideas :-).

Partitioning (aka sharding) indeed helps with "too much bytes" errors, but it makes HDD space usage a lot less efficient. We think that partitioning on IDs (that's the only "dimension" we have in this data) would yield better results, than on timestamps (which - like I wrote - are pretty sparse).

We are not using Tranquility yet, we're mostly waiting for "windowPeriod" removal [Also not enough time to transition current setups...] :-). For now, we've created our own "poor man's Tranquility", which uses "index_realtime" tasks directly.

I have a couple of implementation questions, if you don't mind. Hope you can help :-).

1. Is there a way to put CSV formatted data to "index_realtime" endpoint, instead of JSON? We've tried setting the same parserSpec that works with "index" task, but it still says it expects JSON.
2. How to find out which port is assigned to which task?
We know this is stored in zookeeper, but IDs in zookeeper are different than task IDs in Overlord.
Looking at Tranquility code, it seems to be using "runningTasks" endpoint on Overlord and a "location" property of task. But when we query this, there's no "location" property. Is there anything else to setup to get these or am I misunderstanding the code? Right now we've come around this by setting separate "serviceName" for each task, but that bloats zookeeper a lot.
3. As far as we understand, there's currently no way to finish a task, other than by using the "timed" firehose, right? Any plans/ETAs on changing that?

Thanks again!

--
Best regards,
    Karol Woźniak
aka Kenji Takahashi
 @  kenji.sx

Nishant Bangarwa

unread,
May 23, 2016, 10:34:20 AM5/23/16
to druid...@googlegroups.com
see Inline

On Mon, 23 May 2016 at 19:05 Karol Woźniak <wozn...@gmail.com> wrote:
On 17 May 2016 at 02:17, Gian Merlino <gi...@imply.io> wrote:
Hey Karol,

Parallel tasks is the way to get better use of resources. Each task has only a single thread devoted to indexing. If you're using Tranquility then you can set the parallelism through "task.partitions" (see https://github.com/druid-io/tranquility/blob/master/docs/configuration.md).

10GB of RAM per task sounds like a lot, you should be able to get by with a GB or two of heap and a GB or two offheap. That 8 core, 32GB machine should be able to run 4–8 tasks.

That should also help with the "wrote too many bytes" errors; those happen when output columns are too large (no single column can be more than 2GB).


Thanks Gian for the answer. Sorry for long delay, we've been investigating based on your response.

It seems that java is just hungry for RAM [surprise ;-)!], if we put a lower limit, it is still able to keep up without using that much memory. We were able to run 4 tasks on the above machine and cut the ingestion time to about 6-7 hours per day, which is a significant improvement.

Still looking for more optimizations, if anyone has any ideas :-).

Partitioning (aka sharding) indeed helps with "too much bytes" errors, but it makes HDD space usage a lot less efficient. We think that partitioning on IDs (that's the only "dimension" we have in this data) would yield better results, than on timestamps (which - like I wrote - are pretty sparse).

We are not using Tranquility yet, we're mostly waiting for "windowPeriod" removal [Also not enough time to transition current setups...] :-). For now, we've created our own "poor man's Tranquility", which uses "index_realtime" tasks directly.

I have a couple of implementation questions, if you don't mind. Hope you can help :-).

1. Is there a way to put CSV formatted data to "index_realtime" endpoint, instead of JSON? We've tried setting the same parserSpec that works with "index" task, but it still says it expects JSON.

EventReceiverFirehose only supports JSON formatted data at present.

2. How to find out which port is assigned to which task?
We know this is stored in zookeeper, but IDs in zookeeper are different than task IDs in Overlord.
Looking at Tranquility code, it seems to be using "runningTasks" endpoint on Overlord and a "location" property of task. But when we query this, there's no "location" property. Is there anything else to setup to get these or am I misunderstanding the code? Right now we've come around this by setting separate "serviceName" for each task, but that bloats zookeeper a lot.

Current master has a way to discover task port using overlord HTTP Api to make task discovery simpler. ( https://github.com/druid-io/druid/pull/2419 )
 
3. As far as we understand, there's currently no way to finish a task, other than by using the "timed" firehose, right? Any plans/ETAs on changing that?

We recently added a way to manually specify shutdown time to event receiver firehose.  ( https://github.com/druid-io/druid/pull/2803
We hope to release 0.9.1-rc1 in a week or two, till then you can try building a release from current master and try it out. 


Thanks again!

--
Best regards,
    Karol Woźniak
aka Kenji Takahashi
 @  kenji.sx

--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+...@googlegroups.com.
To post to this group, send email to druid...@googlegroups.com.

Karol Woźniak

unread,
May 24, 2016, 9:17:19 AM5/24/16
to druid...@googlegroups.com
On 23 May 2016 at 16:34, Nishant Bangarwa <nishant...@gmail.com> wrote:
see Inline

On Mon, 23 May 2016 at 19:05 Karol Woźniak <wozn...@gmail.com> wrote:
On 17 May 2016 at 02:17, Gian Merlino <gi...@imply.io> wrote:
Hey Karol,

Parallel tasks is the way to get better use of resources. Each task has only a single thread devoted to indexing. If you're using Tranquility then you can set the parallelism through "task.partitions" (see https://github.com/druid-io/tranquility/blob/master/docs/configuration.md).

10GB of RAM per task sounds like a lot, you should be able to get by with a GB or two of heap and a GB or two offheap. That 8 core, 32GB machine should be able to run 4–8 tasks.

That should also help with the "wrote too many bytes" errors; those happen when output columns are too large (no single column can be more than 2GB).


Thanks Gian for the answer. Sorry for long delay, we've been investigating based on your response.

It seems that java is just hungry for RAM [surprise ;-)!], if we put a lower limit, it is still able to keep up without using that much memory. We were able to run 4 tasks on the above machine and cut the ingestion time to about 6-7 hours per day, which is a significant improvement.

Still looking for more optimizations, if anyone has any ideas :-).

Partitioning (aka sharding) indeed helps with "too much bytes" errors, but it makes HDD space usage a lot less efficient. We think that partitioning on IDs (that's the only "dimension" we have in this data) would yield better results, than on timestamps (which - like I wrote - are pretty sparse).

We are not using Tranquility yet, we're mostly waiting for "windowPeriod" removal [Also not enough time to transition current setups...] :-). For now, we've created our own "poor man's Tranquility", which uses "index_realtime" tasks directly.

I have a couple of implementation questions, if you don't mind. Hope you can help :-).

1. Is there a way to put CSV formatted data to "index_realtime" endpoint, instead of JSON? We've tried setting the same parserSpec that works with "index" task, but it still says it expects JSON.

EventReceiverFirehose only supports JSON formatted data at present.

2. How to find out which port is assigned to which task?
We know this is stored in zookeeper, but IDs in zookeeper are different than task IDs in Overlord.
Looking at Tranquility code, it seems to be using "runningTasks" endpoint on Overlord and a "location" property of task. But when we query this, there's no "location" property. Is there anything else to setup to get these or am I misunderstanding the code? Right now we've come around this by setting separate "serviceName" for each task, but that bloats zookeeper a lot.

Current master has a way to discover task port using overlord HTTP Api to make task discovery simpler. ( https://github.com/druid-io/druid/pull/2419 )
 
3. As far as we understand, there's currently no way to finish a task, other than by using the "timed" firehose, right? Any plans/ETAs on changing that?

We recently added a way to manually specify shutdown time to event receiver firehose.  ( https://github.com/druid-io/druid/pull/2803
We hope to release 0.9.1-rc1 in a week or two, till then you can try building a release from current master and try it out. 

Thanks for the answers!

These changes look really good, will simplify our logic quite a bit. Looking forward to the next release :-).
Reply all
Reply to author
Forward
0 new messages