Druid Index Overwrites Data from Previous indexed interval

1,094 views
Skip to first unread message

Nick Tate

unread,
Dec 22, 2014, 11:03:23 AM12/22/14
to druid-de...@googlegroups.com
I'm creating an hourly indexing job from Hadoop that takes processed rows and loads them into druid. A two hour example of index tasks intervals would be as follows...

Index 1:  ["2014-12-22T03:59:51/2014-12-22T04:59:48"]

I'm setting an hour granularity for each of the index tasks. After the first task, a time-series query for the day with hour granularity would be similar to the following.

2014-12-22T03:00:00 -- 30
2014-12-22T04:00:00 -- 25,000

This makes sense for our data input and time interval because it is catching a few events from the previous hour, but then the majority from the latter. Then we come to the next index task for the next set of data.

Index 2:  ["2014-12-22T04:59:55/2014-12-22T05:59:43"]

Once again I perform a time series query, but the output is unexpected.

2014-12-22T03:00:00 -- 30
2014-12-22T04:00:00 -- 42
2014-12-22T05:00:00 -- 23,000

All of the data from the previous time segment seems to be removed. My guess is that since my interval contains 2014-12-22T04:00:00, but I'm only 42 events are being indexed for that interval the second time around, it is removing all the previous data. My assumption is that the data for these intervals would "rollup", based on the previous aggregate values rather than remove them.

Any help would be appreciated.

Thanks!



Fangjin Yang

unread,
Dec 22, 2014, 11:13:12 AM12/22/14
to druid-de...@googlegroups.com
Nick, this is intended behavior as Druid uses MVCC. We recommend keeping around a copy of your raw data and reindexing the data for a given time interval. There's an outstanding issue to make this process a bit more intuitive here: https://github.com/druid-io/druid/issues/418, which is on our roadmap along with potentially reworking Druid's ingestion mechanisms.

Nick Tate

unread,
Dec 22, 2014, 11:19:02 AM12/22/14
to druid-de...@googlegroups.com
Thanks for the quick response! Do you have any on-going discussion threads on how you would overhaul the ingestion mechanisms or is that an internal process? If you do, I'd love to be a part of them.

- nick

Fangjin Yang

unread,
Dec 22, 2014, 11:37:48 AM12/22/14
to druid-de...@googlegroups.com
There's no formal design thread on it yet, but we were thinking of removing windowPeriod and being able to smartly handle appends of late events and supporting exactly once processing.

Joshua Buss

unread,
Mar 18, 2015, 1:00:28 AM3/18/15
to druid-de...@googlegroups.com
Sorry to resurrect an old thread, but this particular topic is the one thing that scares us a bit about druid - we have "replay events" quite often and the other stats systems we've used / built all deal with data the same way regardless of the timestamps on them.. druid is the first system we've worked with that might throw data away if it's "too old".

So - yeah - any plans for fixing this or thoughts on how to deal with delayed data now would be extremely helpful for us.  OR - just a better understanding of the downsides of setting a large windowPeriod.  I checked the 418 issue but don't see much discussion happening there.

Thanks,

-Josh

Fangjin Yang

unread,
Mar 18, 2015, 1:14:16 AM3/18/15
to druid-de...@googlegroups.com
Joshua, given the current state of message buses (Kafka), stream processors (Storm, Samza), and data stores (Druid), you will never get 100% accuracy with just streaming data in. One day this won't be the case, all the open source projects are working towards a world where batch ingestion is not needed, but right now if you want 100% accuracy in your data, batch ingestion is very much required. Druid's realtime ingestion is not 100% accurate, which is why we run a lambda architecture in production.

If you are interested, the relevant proposal to fix windowPeriod and handle delayed events is here: https://groups.google.com/forum/#!topic/druid-development/kHgHTgqKFlQ

--
You received this message because you are subscribed to the Google Groups "Druid Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-developm...@googlegroups.com.
To post to this group, send email to druid-de...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-development/7160ef50-0c86-4708-aca9-2509ce755cef%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Eric Tschetter

unread,
Mar 18, 2015, 1:41:22 AM3/18/15
to druid-de...@googlegroups.com
To be clear, Druid treats all data the same way when doing batch
ingestion. It's only the real-time ingestion piece that can ignore
data, but it can always be fixed back up in batch. And, we have plans
to and most definitely will adjust it so that real-time appends can
also be done regardless of timestamp.

Basically, whatever other system you dealt with, you most likely had
some tables partitioned out by time and if some time period in the
past changed, you went and re-processed that chunk of time in the
past, truncated some partition and replaced it with a new one. That's
the same model that batch ingestion takes, Druid just takes care of
truncating and replacement for you in a seamless manner that doesn't
impact users.

In "data replay" cases, it's actually very difficult to get away from
a restatement model in batch, especially if your "replay" (or
"restatement") might delete some data. If you've already aggregated
some values and you need to remove some of the initial input rows for
those aggregations, it's significantly easier to just re-aggregate
than it is to actually try to apply deltas (at least, everyone I know
that has built a system trying to apply deltas has eventually realized
that it was a fool's errand and restructured it to just restate the
data).

All of that said, the recommendation is that however you are getting
your data in in real-time, you should also be teeing it off to some
other warehouse that holds the raw data. There are a number of
reasons for this architecture including

1) What Fangjin said, no real-time setup with presently available
technology that I am aware of can provide 100% data quality guarantees
on streaming data. Batch processing can provide much more guarantees
right now
2) You don't want to be locked in to any data processing system (even
Druid). Having raw data sitting around gives you the freedom and
flexibility to change your mind about infrastructure later.
3) Druid is best when it is summarizing your data. In general,
storing raw data in Druid is usually more expensive than people want
to pay. If you are storing summaries, you are losing some fidelity in
what is available, having the raw data available in some warehouse
somewhere gives you the flexibility to go back and ask whatever
arbitrary question you want
4) Druid is not currently a data warehouse replacement, it is a tool
in the open source data warehouse arsenal that can provide fast
queries against summaries of data and has the ability to provide the
operational characteristics required to actually power a user-facing
application.

While I am biased, I actually think that recommending people setup an
architecture where they maintain data in a batch-process-able setup is
setting them up for success in the long-run rather than adding
unnecessary complexity. At least, given the current state of the art.

--Eric
> https://groups.google.com/d/msgid/druid-development/CAKyF60Jvy1hDUwuyiDR9rZaREkHRjGtVcST%2Bth9Wa9pcytrsmg%40mail.gmail.com.

Joshua Buss

unread,
Mar 18, 2015, 9:15:46 AM3/18/15
to druid-de...@googlegroups.com
It's wonderful to finally talk to people who've definitely thought about the same problems we've been thinking about as much or maybe even more than we have... it means I don't think you guys will flinch if I dump on you the problem we've been trying to solve for nearly 3 years now...

First off, let me explain the stats system we built first which still works beautifully for its particular use case.  This data has only two dimensions that are indexed (with a single compound index) - timestamp and a single unique ID.  Here's an example event:

{ "timestamp": 1234567890, "id": 'abcdef', "count": 1, "events": { "dictionary": 1, "of": 1, "more": 1, "data": 1 } }

We pre-aggregate all events with a very similar concept of a window period in our own home-written python process, which consolidates all the counts of shared keys; the whole point of the system is to simply count the total number of times each of those discrete events happened (including the changing members of that events dictionary).  Then we leverage what we've come to realize is one of the best features in the industry - MongoDBs upsert function.  It atomically adds the new document if that timestamp:id doesn't exist yet, OR increments the values in each sub field if it does.  This means it doesn't matter if data comes in out of order - it's already indexed on timestamp and our preaggregator doesn't have any knowledge of what time it currently is - it just buckets events into its in-memory map and then every minute it runs through its map and upserts everything it has.

As for reading, it's all pretty quick thanks to MongoDBs aggregation pipeline.  But enough about the simple case - things got rough for us as soon as we started adding more things to index on.

The events from our new system look like this:

{ "timestamp": 1234567890, "id": 'abcdef', "metric": <one of a handful of things>, "tags": {  "dict": "abcdef", "of": "abcdef", "things": "abcdef" } }

The tags on each metric depend on which metric it is, and the requirement of our new system is that it MUST be queryable by these tags.  MongoDB falls flat on its face now because it doesn't support indexes this complex.  My colleague explained this problem in detail on his blog: http://codyaray.com/2014/11/mongo-multi-key-index-performance.

So - with MongoDB out, we looked to other stats systems.  OpenTSBD was one of the main ones we found when it comes to having lots of tags on your metrics, but we're a cassandra shop, so we found KairosDB and have been using it for about a year now while our stats system is still only taking a small amount of traffic.  However, even KairosDB is known to have problems with high cardinalities of tags, as it bumps into cassandra's built-in limitations of having huge rows.  Basically, we have millions - eventually billions - of combinations of these tags and their values and that isn't supported yet in the KairosDB datamodel.

It does however let you post 'new' data to any time - so if you want to increment the value of a set of things at time T, all you do is read the current value, add your new value, and write that out.  That's what our new python pre-aggregator does with kairosdb and like I said, it's working for now.


Druid seems like magic to us because it doesn't have any of these high cardinality problems - but it brings a whole new problem because we've never considered the stats system would look at old data and new data as different.  Perhaps I just need to read up on batch processing - when we do have 'replay events', it just means we found some data orphaned off somewhere due to some other upstream failure and now we have to account for some missing events.  These can be anywhere from a few hours old to days or even weeks old depending on when we're aware of the issue.

At the end of the day though, we're not doing anything crazy or super complicated - we just have pretty rich data coming in - so you can imagine our frustration on not finding something sooner.

I hope this helps frame our problem better and you can shed some light on what we can try next.  We tried using storm before we came up with the python preaggregator and it proved to be disastrous because it had to treat each message separately and aggregated it all the way to kairosDB (and update the 'state' in KairosDB with another special value) - which meant our input rate was coupled to our db transaction rate - and avoiding that is the whole point of using a pre-aggregator in the first place!

Lastly - we can't hold the raw data for too long.  It's a ton of data and we're a pretty cheap shop.  As it stats now, we have 6 m1.xlarges for our kafka servers in our largest region and they can only hold 4 days worth of data.

-Josh

Eric Tschetter

unread,
Mar 19, 2015, 12:00:50 PM3/19/15
to druid-de...@googlegroups.com
Response in-line. I'll start though by saying that your walk through
the space and evolution your system makes a lot of sense.

> It does however let you post 'new' data to any time - so if you want to
> increment the value of a set of things at time T, all you do is read the
> current value, add your new value, and write that out. That's what our new
> python pre-aggregator does with kairosdb and like I said, it's working for
> now.

These systems definitely do work. They all essentially fall back to
the traditional "RRD" style model of metrics aggregation, however.
That is, they are optimized for looking at a *single* timeseries
instead of optimized for aggregating over a bunch of individual
timeseries. Depending on use case, this can be exactly what you want
and there are a lot of operational monitoring tools that have gone a
long way and done a lot of stuff with this model.


> Druid seems like magic to us because it doesn't have any of these high
> cardinality problems - but it brings a whole new problem because we've never
> considered the stats system would look at old data and new data as
> different. Perhaps I just need to read up on batch processing - when we do
> have 'replay events', it just means we found some data orphaned off
> somewhere due to some other upstream failure and now we have to account for
> some missing events. These can be anywhere from a few hours old to days or
> even weeks old depending on when we're aware of the issue.

First, let me tell you that Druid is not a panacea :).

With Druid, there are a couple of other reasons you might want to
restate your data:

1) If you have late-coming data, the only way to get it reflected is
to restate the data
2) Kafka and other upstream systems can create duplicate events, if
you want to have accurate and meaningful deduplication, you generally
must do it in a batch system
3) Business logic will sometimes want to invalidate old events (this
is particularly true in the ads world, but can also be true in other
places), the only way to invalidate stuff is to restate it.

Given what you have described so far, I'm assuming that your requirements are

1) Ok with double counting of some events
2) Never need to invalidate events
3) Ok with things coming in late and updating old numbers

If that's the case, then it is true that you would be well served by
something that could just take the appends late into the system. If
that is true, then you should be very interested in the proposal that
Gian made a bit earlier, because it will exactly serve your purpose.
It will take time to implement, but we are moving in that direction.


> I hope this helps frame our problem better and you can shed some light on
> what we can try next. We tried using storm before we came up with the
> python preaggregator and it proved to be disastrous because it had to treat
> each message separately and aggregated it all the way to kairosDB (and
> update the 'state' in KairosDB with another special value) - which meant our
> input rate was coupled to our db transaction rate - and avoiding that is the
> whole point of using a pre-aggregator in the first place!
>
> Lastly - we can't hold the raw data for too long. It's a ton of data and
> we're a pretty cheap shop. As it stats now, we have 6 m1.xlarges for our
> kafka servers in our largest region and they can only hold 4 days worth of
> data.

If you can put a limit on how much you keep and how late the data can
be delivered, a good interim solution might be to store the last weeks
worth of data in S3, deleting a day every day. And, right before you
delete it, run a batch indexing job on EMR to load up whatever was
delivered late. This would give you ~6 days of "window period" to be
able to accept late coming data.

--Eric
> https://groups.google.com/d/msgid/druid-development/781cf3c3-dfb3-4c57-989f-f3e224519780%40googlegroups.com.

Joshua Buss

unread,
Mar 19, 2015, 12:22:03 PM3/19/15
to druid-de...@googlegroups.com
Awesome reply, thanks for taking the time to read through it and give some more advice.

We're actually not ok with double counting - but we send known rates in to test on and check the outputs and so far, we're not under or over counting with our current systems.
We've also not seen too many problems yet with Kafka as far as duplicating events, but maybe that will start happening as we increase our scale.

I can tell we've only just begun our journey with druid - I'd like to test the other deep storage mechanisms (s3 and hadoop) and understand exactly what our hardware needs would be to store X amount of raw data... if it's not too bad, maybe we can move to a lambda architecture like you use - druid realtime nodes for realtime queries, then re-process batches at the end of the day or after replay events.

We're also having good luck with Elasticsearch-backed kairosdb as it doesn't have the cardinality issues - its main problem is trying to read-than-writes quickly kills its index performance.   BUT - as a long term store for ad-hoc queries it really excels.

I have a strong feeling we might end up with several components, each serving their own purpose, just like Netflix.  I'll definitely let you guys know how things pan out.  Our first experiment is to set up a small hadoop cluster so we can try the batch ingestion.

-Josh

Gian Merlino

unread,
Mar 19, 2015, 4:07:39 PM3/19/15
to druid-de...@googlegroups.com
Joshua, just fyi, there are a couple of specific ways that Kafka can generate duplicates.

1) If you're using the high-level consumer, it is designed to offer at-least-once processing, and can generate dupes during rebalances (scaling up/down) and during fault recovery (restarting a consumer). It's possible to do consuming without generating duplicates if you use the low-level consumer, which Druid will probably end up using at some point in the future.

2) All Kafka producers can generate duplicate messages if there are connectivity issues that cause a retry. The scenario is that they send a message, it gets written to Kafka, but the ack is lost for whatever reason. In that case, the producer will send it again, and you'll get two copies.

So, if you have a mostly static set of consumers that you don't restart often, and a mostly stable network, you generally won't see duplicates. But they can happen. In our world we deal with this through a lambda architecture that has a duplicate-removal step as part of its batch half.

Joshua Buss

unread,
Mar 22, 2015, 11:24:51 AM3/22/15
to druid-de...@googlegroups.com
Thanks for the detail.. I'll be sure to watch out for it.

As for re-processing, I finally had my first success at running a batch ingestion job which replaced a segment previously stored off into deep storage.  I based my example off the wikipedia batch ingestion example because I didn't get very far with hadoop.  The indexing task only took a minute for about 100mb of raw data (I'm using segment sizes of 1 hour now), the handoff to the historical node took about another minute, and the results of my queries afterward were exactly what I was hoping to see. 

That all being said - I want to make sure I'm understanding the current capabilities of druid properly... Do I absolutely NEED all the original raw data that made up that segment with newly-arrived 'old data', or is there a way to look at a segment in deep storage and re-index what's in there WITH some new raw data that just arrived?

It would seem to me like whatever's in those deep-storage segments would be sufficient to do a sort of 'export' to get the minimum granularity out, which would mean you're in a place where you could do a batch ingestion with any new data you want to add... but maybe that's exactly what you're already proposing here.. I'm not sure so I was just hoping someone could clarify that a bit more for me. :)

-Josh

Eric Tschetter

unread,
Mar 22, 2015, 11:28:58 AM3/22/15
to druid-de...@googlegroups.com
What you are asking for is not currently possible without code changes, but is a part of what is proposed.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-development/e336db6f-bf16-4a02-8d89-f610075b9a8c%40googlegroups.com.

Joshua Buss

unread,
Mar 22, 2015, 1:52:18 PM3/22/15
to druid-de...@googlegroups.com
Understood.  Is an "export" like I mentioned currently possible?  A way to take a segment from deep storage and 'unzip' it?  If not, that might be something our team would be interested in helping with, as I'm not sure it's going to be feasible for us to store our raw data for very long.

-Josh
To unsubscribe from this group and stop receiving emails from it, send an email to druid-development+unsubscribe@googlegroups.com.
To post to this group, send email to druid-development@googlegroups.com.

DriudOrNot

unread,
Jun 18, 2017, 12:12:36 AM6/18/17
to Druid Development, nick...@brandingbrand.com
I suggest you use elastic search with kibbana , druid is pain in ass to use ....lof of fundamental design  issues ...index getting over written and all no dynamic dimesion support (so if have extra fields which are not in dimesion list ...it will reject ) also realtime indexing almost never works ...also it painful to run just run 5 services to get it running and  monostrous amount of memory ... if you reduce granularity level you will get out memory errors  unless you gigantic server with 64 gb ram 


Why this project even exists is a mystery with above design  limitation   is mistery .... if somebody neads aggregation so just do in memory and dump the data in reliable storage like cassandra  or just  use elastic search though tps will be lower but still acceptable 

I suggest you stop wasting on druid and move on 

Gian Merlino

unread,
Jun 19, 2017, 6:54:21 PM6/19/17
to druid-de...@googlegroups.com
Hey GP,

With all due respect, I suggest asking yourself if you're missing something, and if there's a reason that the folks on http://druid.io/druid-powered have all chosen Druid for aggregations on big, complex data.

Gian

--
You received this message because you are subscribed to the Google Groups "Druid Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-development+unsubscribe@googlegroups.com.
To post to this group, send email to druid-development@googlegroups.com.

cf.ag...@gmail.com

unread,
Jun 20, 2017, 12:40:09 AM6/20/17
to Druid Development
I have been following groups but first time to post. It is funny that silicon valley likes to "disrupt big companies" and "question establish ideas" but when their work is challenged druid use big companies to counter argue. I think GP can be more diplomatic but community response to this and user group post is defensive and ego intensive

Gian

To unsubscribe from this group and stop receiving emails from it, send an email to druid-developm...@googlegroups.com.
To post to this group, send email to druid-de...@googlegroups.com.

Gian Merlino

unread,
Jun 20, 2017, 10:20:29 AM6/20/17
to druid-de...@googlegroups.com
Please, if this thread is going to continue, let's try to keep it civil and constructive.

And, CF, welcome: always nice to see a first time poster.

Gian

To unsubscribe from this group and stop receiving emails from it, send an email to druid-development+unsubscribe@googlegroups.com.
To post to this group, send email to druid-development@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-development/6e72ad76-baf5-49aa-8d92-49ed42eff7cc%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages