Indexing strategy for very heterogenous data

347 views
Skip to first unread message

jon.tr...@optimizely.com

unread,
Mar 2, 2015, 2:30:04 PM3/2/15
to druid-de...@googlegroups.com
We are evaluating Druid as a possible store for counts data here at Optimizely, and are running into some issues with index generation. For the time being, I'm only working with batch index generation using the indexing service though eventually we plan to turn on real time indexing. The problem we're running into is that the indexes being produced are large (about 50gb per day) but largely empty (they compress about 20-25:1 with zip and have large blocks of zeros when looking at them in a binary editor), so they're tiny in deep storage but large when expanded into memory on the historical nodes. I'd like to have the Druid cluster be able to quickly answer queries on the past 30 days of data, but spinning up 30 r3.2xlarges, which is what we'd need to keep this index memory resident, is going to be cost prohibitive.

I believe the problem is that our input data is very heterogenous. We are are a SaaS application, so instead of a single dataset, we really have a collection of datasets, one for each customer. There are about 5000 of these "projects" into our data. The amount of data per project is highly variable -- we're seeing everything from a few 100 rows per hour up to 1,000,000+ rows per hour depending on the project.

Each project has a variable number of dimensions that we're mapping to a fixed 100 dimension schema (used for group by queries). The values in each of these dimensions varies from project to project, so for example, we'll map a feature that has values "A", "B" and "C" from one project and a feature with values "X", "Y", and "9" from another project to the same "dimension12" in the Druid schema. In general, there is no way to force multiple projects into using common values, as the values are set by our customers. This, I believe is where the problem arises: when the indexer creates indexes, it creates a bitmap for each possible value in a dimension, which in our case is the concatenation of all possible values across all projects in that segment. However, because individual values are tied to a single project, 99%+ of this bitmap will be zero.

So far, we've tried both hash partitioning and per-dimension partitioning. The hash partitioning makes indexes that are bigger both compressed and expanded, so I think per-dimension partitioning is the way to go here. In addition, queries are always going to include project ID as a filter, so I think it makes sense to shard all data from a single project into one segment instead of requiring us to load multiple segments.

Do you guys know of any way we can shrink the size of these indexes? Some ideas we've considered:

- The roaring bitmaps in Druid 0.7 might compress better in memory -- we're going to spin up a 0.7 cluster this week to try.
- Force the indexer to generate one segment per project per time interval. This will give us a ton of small segments, but less empty bitmaps. We could then group segments together with an append task.
- Generate per-project dimensions. This would be a bit difficult since it would have to be done on the fly as projects add dimensions. I'm also not sure how well Druid would work with many (100,000+) mostly sparse dimensions
- Generate per-project datasources. This would complicate both batch and real time indexing, since we'd have to have 5000+ separate index tasks. Also, I'm not sure how to configure the realtime tasks to generate multiple datasources from one kafka firehose.
- Add a "mapping service" layer to our ingestion process to map per-project dimension values into a single shared vocabulary.
- Modify the indexing jobs to generate per-project indexes and then append rather than merge them to form segments.

Any guidance you guys can provide would be greatly appreciated.

Xavier Léauté

unread,
Mar 2, 2015, 3:08:40 PM3/2/15
to druid-de...@googlegroups.com
> Do you guys know of any way we can shrink the size of these indexes? Some
> ideas we've considered:

Existing bitmaps should be very sparse, since the concise compression
used by default is a variation of run-length encoding.

I wonder if in your case it's not the bitmaps, but the actual
dimension columns themselves, i.e. the arrays that map rows to
dimension values. If the columns themselves are sparse, then those
could be filled with lots of zeros. A solution to that would be to
have Druid also compress dimension columns in addition to metric
columns which are already compressed using LZ4.

> - The roaring bitmaps in Druid 0.7 might compress better in memory -- we're
> going to spin up a 0.7 cluster this week to try.

Roaring may work better for very sparse data, but given how
data-dependent the compression is, it would be great to find out if
that helps or not.

One thing that would be helpful for us is if you could share one of
the meta.smoosh files inside the index zip file. That will give us an
indication of whether the index or one of the columns might be the
culprit.

Eric Tschetter

unread,
Mar 2, 2015, 3:17:10 PM3/2/15
to druid-de...@googlegroups.com
Jon,

This is a great question, I'd like to start by first repeating what I
believe you are saying the problem is.

You take multiple customers and put them in a single table. However,
each customer can have their own, disparate set of dimensions. What
you've done is say, "we'll set aside up to 100 dimensions and map them
to meaningful dimension names externally".

The problem you are running into, however, is that, for example, each
customer might have a "gender" dimension, but customer 1 might have it
mapped to dimension1, customer2 to dimension2, etc. This means that
each column might have its own set of "male", "female", "transgender",
etc. So, in the limit, all columns actually end up looking like "high
cardinality" columns.

When you look at your indexes, they appear to be really large compared
to the input data and you are thinking that is because of lots of runs
of 0's in the bitmap indexes. Am I understanding correctly?

Assuming that is all correct, I'd like to ask some questions.

Question line 1:

a) Do any of your customers currently use all 100 dimensions, or is
that number being set aside more as a safety?
b) Assuming it's set aside for "safety", are you currently actually
materializing all 100 dimensions (in your ingestion spec, are you
telling it to include all 100 dimensions, or are you allowing it to
see what dimensions actually exist and build the dimension set from
that?)
c) If it is the case that you are materializing all dimensions, I
wouldn't be surprised if the extra space is actually storing "null" in
the unused dimensions. We currently don't optimize columns of a
single value very well. They can effectively be optimized to a
constant and that might resolve your issue. That said, the better fix
would be to allow Druid to build the dimension set as it indexes
instead of materializing all of them. You can do this by using a
dimension "blacklist" instead of a "whitelist" in your indexing spec.
If you'd be willing to share your spec we can probably help point you
to it.


Question line 2:

How many columns do you actually have? That is, if you were to take
the set of dimensions from your current customers, what would the
superset of column names be?

Druid can handle "schema-less" dimensions meaning that it can just add
dimensions as it sees new data. Given that your data schema will be
customer specific and you are going to partition in a way that a
customer's data co-exists in the same segment, you should be able to
leverage the schema-less dimension sets to great effect.

That is, you can use the actual dimension names you get from your
customers and allow Druid to automatically build up the set of
dimensions that it sees. When you partition by customer, this will
mean that segments with customer X data will have customer X columns,
but not necessarily customer Y columns and vice versa (essentially
giving you a proxy of "per customer datasource" without as much
overhead). This also means that values would get to be re-used
between similarly named columns.

Druid can handle different segments with different schemas, so the
fact that the segments do not share a schema is nothing to be
concerned about.

Fwiw, my recommendation is to do the latter and leverage Druid's
schema-less columns. One thing of note for this, however, is that
handling of "null" values is still not fully consistent. The current
set of known bugs are on the ingestion side though and have to do with
empty string vs. null. On the query side, using 0.7, I do not expect
you to run into issues with null handling.

--Eric
> --
> You received this message because you are subscribed to the Google Groups
> "Druid Development" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to druid-developm...@googlegroups.com.
> To post to this group, send email to druid-de...@googlegroups.com.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/druid-development/85468d72-d422-4526-991a-f85f9334d063%40googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.

jon.tr...@optimizely.com

unread,
Mar 2, 2015, 4:08:39 PM3/2/15
to druid-de...@googlegroups.com
It very well could be the dimension columns. I'm only looking at in a hex editor, so I'm not entirely sure what's what when I see a large expanse of zeros. How does one turn on LZ4 compression for metric columns?

I'm happy to share one of the segment files but I probably shouldn't post it publicly. Can I email you one at this address? They're less than 10MB zipped, but about 100-150MB uncompressed.

jon.tr...@optimizely.com

unread,
Mar 2, 2015, 4:20:14 PM3/2/15
to druid-de...@googlegroups.com
Yes, your understanding of the question is exactly right. 

To answer your questions:

1. We are setting aside the 100 dimensions for safety, though it's difficult to answer how exactly the data is distributed. There's a bit of a chicken-and-egg thing going on here in that the product we're going to be using Druid to power doesn't exist yet, so all of the data I'm testing with is simulated -- basically I'm taking our existing product data and augmenting it with synthetically generated columns. 

The current set I'm playing with adds columns per project according to a NegBinomial(0.07, 1) distribution (mean of 13.3 columns per project) with a NegBinomial(0.05, 1) distribution (mean = 19) for the number of possible values per column. I then generate an assignment of values to column for every user in a project. This is in many a worst case test in that a) real data would not have values in a dimension occurring uniformly, b) I expect there to be some common vocabulary of values across projects and c) I believe our choices for both columns/project and values/column are conservatively large.

I will try using that blacklist method and see what happens. I presume the semantics is that it just takes each field in the input JSON as a dimension unless it's specifically excluded?

2. I will definitely try that. If we could actually use the real, per-customer columns instead of doing the mapping, that would save us some effort in other places as well. If it can shrink the index sizes too, then it will be a double win.

Xavier Léauté

unread,
Mar 2, 2015, 4:24:31 PM3/2/15
to druid-de...@googlegroups.com
> It very well could be the dimension columns. I'm only looking at in a hex
> editor, so I'm not entirely sure what's what when I see a large expanse of
> zeros. How does one turn on LZ4 compression for metric columns?

Compression is always turned on for metric columns.

> I'm happy to share one of the segment files but I probably shouldn't post it
> publicly. Can I email you one at this address? They're less than 10MB
> zipped, but about 100-150MB uncompressed.

Yes, feel free to send to xav...@metamarkets.com.

jon.tr...@optimizely.com

unread,
Mar 2, 2015, 4:42:26 PM3/2/15
to druid-de...@googlegroups.com
Thanks. I sent you the segment file via email.

And sorry, I meant how do I turn on compression for dimension columns? Is this a config or is it something that requires code modification?

Eric Tschetter

unread,
Mar 2, 2015, 4:55:11 PM3/2/15
to druid-de...@googlegroups.com
You can't turn on compression for the dimension columns. I think
Xavier was talking about maybe a feature that we could consider
implementing.

Given your replies, I'm pretty certain the extra size you are dealing
with is going to be because of the extra columns with nothing ("null")
in them. Switching to a more sparse indexing strategy is going to be
your fix.

Also, fwiw, we generally recommend targeting ~512MB segment sizes.
You said your current segments are ~150MB, I'd recommend moving your
target row size up by 3x.

--Eric
> --
> You received this message because you are subscribed to the Google Groups
> "Druid Development" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to druid-developm...@googlegroups.com.
> To post to this group, send email to druid-de...@googlegroups.com.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/druid-development/5f1f7635-98de-4884-82fc-211cf49b9faf%40googlegroups.com.

Xavier Léauté

unread,
Mar 2, 2015, 5:18:04 PM3/2/15
to druid-de...@googlegroups.com
Compression on dimension columns is something that will require code
changes. I'm looking at your files to see where most of the space is
being used.

On Mon, Mar 2, 2015 at 1:42 PM, <jon.tr...@optimizely.com> wrote:

Xavier Léauté

unread,
Mar 2, 2015, 5:45:02 PM3/2/15
to druid-de...@googlegroups.com
Jon, looking at your data seems to confirm my suspicion.

- The biggest columns are about 4MB, with bitmaps and column data
split 50/50, not counting the 100kB worth of string lookups.
- The smallest ones are about 1MB in size, with bitmaps taking up 46
bytes, and column data most the rest, and strings taking up < 4kB

Given that you have about 1 million rows, each dimension is going to
require at least 1MB of column data (not counting bitmaps and string
values), and since you have about 100 columns, it makes sense that
your total index size is about 150MB.

The easiest way to reduce the size of your data would therefore be to
add the ability compress the dimension columns. That would shrink
those sparse columns tremendously. It's not necessarily hard to
implement, but it will require some changes the the underlying data
format.

Eric Tschetter

unread,
Mar 2, 2015, 6:10:07 PM3/2/15
to druid-de...@googlegroups.com
Errr, Xavier, I disagree, the easiest way would be to not store
dimensions that don't exist. (shouldn't even require a code change)

The second easiest way would be to optimize columns of a single value
into just the JSON metadata.

The third easiest would be to add LZ4 across the board for dimensions.
This one is also the riskiest as it potentially effects performance
and will need to be properly verified so that we understand the
memory/performance tradeoff.

--Eric
> To view this discussion on the web visit https://groups.google.com/d/msgid/druid-development/CAKdCgy9oGk_t6ukdZxJ%3DX6Z69b0JJbkTTuZSnQ%3Dfr7V9f6WGVw%40mail.gmail.com.

Xavier Léauté

unread,
Mar 2, 2015, 6:39:16 PM3/2/15
to druid-de...@googlegroups.com
> The second easiest way would be to optimize columns of a single value
> into just the JSON metadata.

Agree, it would be best to handle those special cases separately. From
the cursory look I had at the data, the typical columns have more than
one value, but are otherwise very sparse, so compression seemed to be
the most straightforward way to fix things.

> The third easiest would be to add LZ4 across the board for dimensions.
> This one is also the riskiest as it potentially effects performance
> and will need to be properly verified so that we understand the
> memory/performance tradeoff.

We did some benchmarking on metrics compression recently, and the
limited testing we've done seemed to indicate that the overhead of
de-compressing LZ4 was relatively low for metrics aggregation. I'll
try to see if we can get the numbers.

Dimension values are ordered, so we can expect compression to work
relatively well.
Having less data to scan from memory may also give us better cache
locality, which would compensate some of the overhead of
de-compressing the data.

Overall, the incremental cost in terms of CPU is typically much less
than the incremental cost in terms of RAM, so compression is almost
always beneficial.

I believe it's something worth considering, although I agree that we
should better understand the performance tradeoffs. If we make it
configurable it should be easy to experiment with.
> To view this discussion on the web visit https://groups.google.com/d/msgid/druid-development/CAB8U%2Bh1bMJpVPFqQJAM6Sy60hBFs4Mpsox1X3zxffWY_5Dor9Q%40mail.gmail.com.

Eric Tschetter

unread,
Mar 2, 2015, 7:54:42 PM3/2/15
to druid-de...@googlegroups.com
>> The second easiest way would be to optimize columns of a single value
>> into just the JSON metadata.
>
> Agree, it would be best to handle those special cases separately. From
> the cursory look I had at the data, the typical columns have more than
> one value, but are otherwise very sparse, so compression seemed to be
> the most straightforward way to fix things.

Ah, so there *are* some values in the "long tail" of dimensions? In
that case, then yeah, this optimization wouldn't work. I was thinking
that the issue was a bunch of columns that are just the value "null"
over and over and over again.


>> The third easiest would be to add LZ4 across the board for dimensions.
>> This one is also the riskiest as it potentially effects performance
>> and will need to be properly verified so that we understand the
>> memory/performance tradeoff.
>
> We did some benchmarking on metrics compression recently, and the
> limited testing we've done seemed to indicate that the overhead of
> de-compressing LZ4 was relatively low for metrics aggregation. I'll
> try to see if we can get the numbers.
>
> Dimension values are ordered, so we can expect compression to work
> relatively well.

Careful with this. Dimensions towards the left are ordered,
dimensions toward the right are also ordered, but they are chunked by
the dimensions on the left and thus will end up looking a lot more
random. Not really negating the fact that compression will shrink
things, just wanting to make sure that we don't make any assumptions
about what ordering might or might not buy us on wide tables.


> Having less data to scan from memory may also give us better cache
> locality, which would compensate some of the overhead of
> de-compressing the data.

I'm not sure I buy this necessarily. You have to decompress into a
temporary buffer and then read from that buffer, so the cache locality
of that buffer won't actually change.


> Overall, the incremental cost in terms of CPU is typically much less
> than the incremental cost in terms of RAM, so compression is almost
> always beneficial.

I agree that comparatively, compression will likely be a win when
weighed against the cost of the memory. When I did the metric
compression though, there was definitely a performance cost to
compressing, I expect us to find the same thing with dimensions.


> I believe it's something worth considering, although I agree that we
> should better understand the performance tradeoffs. If we make it
> configurable it should be easy to experiment with.

Yeah, we should probably start looking at the performance cost of
compressing columns. Especially given the information that all
columns are used, just most of them very sparsely used.

Overall I expect compressing them to be a win, I was just thinking
that if the question is one of implementing compression versus just
not persisting something that doesn't actually have any information in
it, not persisting is preferable.

--Eric
> To view this discussion on the web visit https://groups.google.com/d/msgid/druid-development/CAKdCgy9REfwozd%3D_3cYL4XyXm-BiShuymYPpY0N87CNUJJW2mg%40mail.gmail.com.

Gian Merlino

unread,
Mar 2, 2015, 11:10:10 PM3/2/15
to druid-de...@googlegroups.com
Note that for schemaless ingestion (the blacklist method), to make the "segments with customer X data will have customer X columns, but not necessarily customer Y columns" thing happen, you have to actively do something to get your data partitioned by customer. Otherwise it'll be partitioned haphazardly and you'll probably still have a bunch of inefficiently sparse dimension columns.

You can do this in batch by using single-dimension-based partitioning and setting your partitionDimension to the column containing the customer id. In realtime, you will have to do it on the producer side. If you're ingesting directly from Kafka, you can provide a Partitioner to the Kafka producer. If you're using Tranquility, you can do the same thing by providing a beamMergeFn to the DruidBeams builder.

Xavier

unread,
Mar 3, 2015, 3:39:27 PM3/3/15
to druid-de...@googlegroups.com
Yeah, we should probably start looking at the performance cost of 
compressing columns.  Especially given the information that all
columns are used, just most of them very sparsely used. 

Just wanted to follow up on my earlier comment about the benchmarks ran with
different compressions methods. Charles is working on a blog post that will provide
a much more in-depth look at both metric and bitmap compression impact on
performance, but here are some preliminary results:

For a typical timeseries query, with 3 doubleSum, one longSum, and one count aggregators, without filter:
- LZ4 metrics compression is about 5% slower than uncompressed metrics
- Legacy LZF metrics compression is about 12% slower than uncompressed metrics

For a topN with the same aggregations on a high cardinality dimension (no filter)
- LZ4 is about 1% slower than uncompressed
- Legacy LZF is about 4% slower than uncompressed

For a groupBy query with same aggregations on a single low cardinality dimension (no filter)
- LZ4 is about 2% slower than uncompressed
- Legacy LZF is about 5% slower than uncompressed.

Of course, we'd like to run through a lot more combinations to understand the impact
of the number of aggregators, as well as dimension cardinality, and filtering, but hopefully
this gives us some color on what to expect.

jon.tr...@optimizely.com

unread,
Mar 3, 2015, 5:26:31 PM3/3/15
to druid-de...@googlegroups.com
How effectively was the compression in reducing the size of the segments in memory? For our application, a <5% slowdown in query time would be a small price to pay for a significantly smaller RAM footprint.

Also, some updates on our tests:

- I reindexed an hour's worth of data using the schema blacklist method that Eric proposed. It didn't seem to work that well: total index size actually grew from 1.69gb to 2.19gb. The input data still had the fixed mapping to a common set of 100 dimensions.
- I regenerated the test data without the fixed dimension mapper, but the index failed due to too much physical memory usage on our hadoop cluster. I'm retrying it with tweaked tuning parameters.

Side question: is there any way to use a batch index task and manually specify a segmentation? I tried putting the "shardSpecs" field in the tuning parameters, but it still kicked off the determine partitions hadoop job. I'd like to be able to go directly to generate indexes job, since the partitioning isn't actually changing from run to run, since it's the same input.

jon.tr...@optimizely.com

unread,
Mar 3, 2015, 5:28:56 PM3/3/15
to druid-de...@googlegroups.com
Thanks. We're already using single dimension partitioning for our batch jobs. I didn't realize that we'd need to do the partitioning in the Kafka producer, but I don't suspect that would be hard to implement. Another possibility would be to just let the realtime indexing tasks use hashed partitioning, since our plan is to do a batch re-index every 12-24 hours after cleaning the data on hadoop and relying on realtime indexed data only for recently received events.

Eric Tschetter

unread,
Mar 3, 2015, 5:30:41 PM3/3/15
to druid-de...@googlegroups.com
Interesting on the index size growing, that doesn't really make sense
to me... I wonder if it ended up including some other columns as
well?

No, I do not believe there is a way to skip the determine partitions
job. It shouldn't be too difficult to add code that allows you to
pre-specify your partition boundaries though...

--Eric
> --
> You received this message because you are subscribed to the Google Groups
> "Druid Development" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to druid-developm...@googlegroups.com.
> To post to this group, send email to druid-de...@googlegroups.com.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/druid-development/5ecd0a19-7d34-4486-b101-c59b4a167deb%40googlegroups.com.

jon.tr...@optimizely.com

unread,
Mar 3, 2015, 5:43:40 PM3/3/15
to druid-de...@googlegroups.com
Not that I can see. For example on a single segment, the only difference I can see is that the blacklist version has 7 columns missing that are included in the whitelist version (but I assume are empty). The sharding structure is the same but the size grew from 138,031,852 bytes to173,673,574 bytes, though. I can send you the before and after versions if you have tools that would allow you to compare them.

Eric Tschetter

unread,
Mar 3, 2015, 5:48:36 PM3/3/15
to druid-de...@googlegroups.com
Do you have less segments created? Did you increase the target rows at all?
> https://groups.google.com/d/msgid/druid-development/ecec8068-ab76-444a-bf3c-fa215e6565ed%40googlegroups.com.

jon.tr...@optimizely.com

unread,
Mar 3, 2015, 6:01:26 PM3/3/15
to druid-de...@googlegroups.com
No, both jobs created 386 segments for one day's data and both had the same target rows (1,000,000, max 3,000,000). For the hour I was using for the comparison, the segmentation is the same as far as I can tell.

Gian Merlino

unread,
Mar 3, 2015, 8:27:37 PM3/3/15
to druid-de...@googlegroups.com
The index size growing after switching to schemaless could be due to #658.

Jon, have you tried indexing each customer's data in its own datasource? That'll at least take out the effect of some dimensions only being used by some customers. I think it'd be an interesting experiment even if you don't end up running that way in production.

On Tuesday, March 3, 2015 at 2:30:41 PM UTC-8, Eric Tschetter wrote:

jon.tr...@optimizely.com

unread,
Mar 3, 2015, 8:57:31 PM3/3/15
to druid-de...@googlegroups.com
Yes, that's on my list of things to try. I think it would work very well for our application insofar as individual projects within our data are in independent silos: I can't think of any use case where we'd want to run queries that aren't conditioned on a single project ID. However, the total number of projects is large and is going to grow over time, so I'll need to write a job that will generate index task configs from the data itself. Ideally, I'd write a custom index generation job that will generate one datasource per project in the input data from a single hadoop job rather than having to fire off 5000+ individual tiny jobs.

In the meantime, I'm experimenting with target rows settings that will attempt to accomplish the same thing: i.e. specify a low number of target rows to force a roughly one segment = one project sharding but give a high max rows so that the job doesn't fail on the larger projects. It would be awfully nice if the single dimension sharding strategy could further subdivide a single large shard using either hashing or finer interval time buckets. I'm running into the issue that while we have lots of very smaller projects, we have at least one thats close to 40,000,000 rows per day. We'd get better performance on the smaller projects by aggregating daily and grouping a few of them in a 3-4m row segment while taking the huge guys and putting them into hourly 1-2m row segments. I've been studying the indexing hadoop job, but am still not quite sure how difficult this would be to implement.

Gian Merlino

unread,
Mar 3, 2015, 9:07:38 PM3/3/15
to druid-de...@googlegroups.com
I think breaking up the larger projects like that is doable. It'd involve making a new kind of shard spec and changing the determine-partitions job a bit to take advantage of that.

Charles Allen

unread,
Mar 3, 2015, 9:12:16 PM3/3/15
to druid-de...@googlegroups.com

Because it was asked earlier in the thread:


Percent change in median query execution time compared to 0.6.160 using concise-lzf
(more negative is better). Best in query class is highlited.

Query

Bitmap

LZ4 - Native

LZF - Native

Uncompressed

GroupByLow

Cardinality

Concise

-4.5%

-1.1%

-6.0%

Roaring

-5.1%

-1.4%

-5.3%

GroupByRegex

Concise

-30.2%

-30.7%

-30.6%

Roaring

-39.1%

-39.5%

-38.9%

Timeseries

Concise

-8.3%

-2.1%

-12.4%

Roaring

-8.3%

-2.1%

-13.0%

TimeseriesFilter

Concise

-5.8%

+2.9%

-7.7%

Roaring

+9.6%

+18.3%

+6.7%

TopN

Concise

-40.4%

-38.6%

-41.2%

Roaring

-40.9%

-39.0%

-42.1%

TopNRegex

Concise

-34.6%

-35.5%

-35.3%

Roaring

-47.4%

-47.8%

-47.9%




The change in on-disk segment size in bytes compared to the comparable 0.6.160 (774,534,273) :



LZF

LZ4

Uncompressed

Concise

772,560,446

( -0.25% )

759,023,095

( -2.00% )

905,714,049

( +16.9% )

Roaring

831,786,452

( +7.39% )

818,249,101

( +5.64% )

964,940,055

( +24.6% )

Charles Allen

unread,
Mar 3, 2015, 9:13:37 PM3/3/15
to druid-de...@googlegroups.com
It is worth noting that these queries only apply to data which is already paged into memory.
...

jon.tr...@optimizely.com

unread,
Mar 5, 2015, 1:46:22 PM3/5/15
to druid-de...@googlegroups.com
So, some updates on this:

With our current synthetic data, I am not able to successfully index the data without using the mapping of per-project dimensions to a fixed set. 

Using single dimension partitioning, I'm having yarn kill my reducers due to excessive physical memory usage (mapreduce.reduce.memory.mb is set to 7200). With a large target row size (1,000,000) it appears to be blowing up in reducers that have many small project packed together (so a large total number of dimensions in the segment). With a small target row size (100,000) it works except for our three largest projects with 25m, 35m, and 28m rows each even though I have maxRowsInMemory set to only 1,000,000. I can probably try increasing the reducer memory -- max on our cluster is 12,600 -- but the job will end up running really long since there's 786 reducers with these settings and we'll only be able to schedule 30 at a time with that much memory allocated.

Using hash partitioning, the job blows up with a Java out of memory error. I haven't tried to tune this since I think hash partitioning is probably a non-starter given the distribution of our data and queries.

Some followup questions:

1. The docs recommend aiming for segment sizes in the 500-1000MB range. However, because of the peculiarities of our data, we seem to be getting tighter indexes from a one-project-per-section or few-projects-per-section sharding scheme, which often results in tiny (< 1MB) segments for low traffic projects. What is the actual effect of having a large number of small segments in the Druid system? In our case, all queries will be filtered by project ID, so is there going to be much harm from having, say, one 1MB single project segment per day vs. trying to build a 500 project 500MB segment per day (which seems to be blowing up the index process)?

2. Is there any built in support for using EMR clusters for the batch indexing? Given the high memory use of the batch indexing job, we're thinking now about using an EMR cluster with r3.xlarge nodes to do our batch indexing instead of connecting it to our multi-tenant data mining cluster (32 m1.xlarge nodes). Ideally, I'd have the indexing task spin up an EMR cluster, do the indexing, then spin it back down once the output segments are in S3 deep storage. We can probably write such a task, but if there's existing support that would be easier.

I probably have to move on from these experiments, but this dialog has been very helpful. Being able to dynamically set the schema will simplify a lot of things in this product we're building. I'm not too worried about some of the indexing roadblocks we've been seeing, since it will be several months before we come anywhere close to seeing this volume of traffic on the new product. In that time I think we can work out a sharding scheme that works and doesn't blow up on Hadoop.

Thanks for all your help. I'm sure I'll have more questions as we get deeper into this project.

Gian Merlino

unread,
Mar 5, 2015, 6:48:02 PM3/5/15
to druid-de...@googlegroups.com
Thanks a lot for writing up the experiments you're doing!

For the memory stuff, it might be interesting to try running from master. PR #1097 is going into 0.7.1 and in many cases will lower the amount of data spilled and mmapped by the indexing reducers.

About your questions:

1) It depends what "large" and "small" mean ;). Fwiw our cluster has about 500,000 segments. The biggest overheads per segment are probably that each one adds some extra metadata to track for coordinators and brokers, each one adds extra keys to the query cache (since it's a by-segment cache), and each one ends up as a separate file that needs to be queried separately on historical nodes (results are merged after each segment is queried).

2) There's nothing EMR specific in there. We've used EMR in the past, which works as long as you use the right jars, although our cluster was an always-alive sort of cluster and not spun up on demand.

jon.tr...@optimizely.com

unread,
Mar 6, 2015, 7:35:59 PM3/6/15
to druid-de...@googlegroups.com
Thanks for all your help. We're hoping to have our 0.7.1 cluster up some time next week, so it will be interesting to compare against what I'm seeing on 0.6.171.

I think I finally have the fully dynamic schema version of the indexing working by tweaking some of the parameters and giving the reducers the max memory I can (12.6gb). Before, where we were mapping to a fixed 100 dimension format, we were seeing about 1.7gb per hour of indexes. With the fully dynamic schema, I'm seeing 1.5-2gb individual segments for ones with 100+ project in it. I suspect what's happening is that in a segment with 100+ projects, we might end up with 1000-10,000 dimensions and the indexer is expanding the number of columns to accommodate all columns in all rows, even though almost all of these values will be null. If that's indeed the case, I'm going to configure the indexer to limit segments to < 10 projects, even if that results in a lot of small segments. 

Can you explain to me the difference under the hood between an "append" task and a "merge" task? The docs imply that an append just concatenates a bunch of segments into a single file, but the source code looks like it's doing at least some merging of column indexes. If it's not doing a full merge like the batch indexer does, it might be a good compromise for us to generate one segment per project, then append a bunch of small ones together to simplify segment management. However, if appending two segments with non-shared dimensions results in an output that's bigger than the sum of the input segment sizes, then that won't work.

Gian Merlino

unread,
Mar 8, 2015, 4:55:53 PM3/8/15
to druid-de...@googlegroups.com
The append and merge tasks both create a single output segment that has the union of all column names present in the original segments. The difference is that the merge task tries to combine rows that should roll up together, and the append task doesn't try to do that. So I think neither will do what you want, since both of them will generate 'full-height' output columns.

Xavier Léauté

unread,
Mar 16, 2015, 3:18:29 PM3/16/15
to druid-de...@googlegroups.com
Jon, I have an early-stage implementation of dimension compression in
this branch:
https://github.com/metamx/druid/tree/dimension-compression

This is still in very early stages, and is currently not compatible
with existing segments and only compresses single-value dimensions.
However, I thought you might be interested in giving it a shot, to see
how much it reduces your segment sizes. It uses the same compression
strategy as metric columns (64k LZ4 blocks).

Initial testing indicate that for sparse or low-cardinality columns
the size is typically reduced anywhere between 50% and 95% depending
on the data. For high-cardinality columns or with relatively unique
values, it can actually increase the size due to the additional
indexing overhead, somewhere between 3% and 20%.

Performance-wise there is still some work to be done, queries are
currently about 10-20% slower. Overhead varies with the complexity of
the query, with simple, low-cardinality top-N affected more than
complex high-cardinality ones.

Let me know if you get a chance to try this out!

Xavier

Xavier Léauté

unread,
Mar 16, 2015, 7:37:59 PM3/16/15
to druid-de...@googlegroups.com
Following up on my previous post, I added support for compressed
integer columns in addition to long.
That helps both with size, reducing the overhead by almost half, and
also seems to help with performance.
Queries are now only 4% to 8% slower on average than without
compression, which is much more acceptable.

If anyone wants to try it out, use
https://github.com/metamx/druid/tree/dimension-compression #42cb027

Xavier

unread,
Mar 25, 2015, 7:54:08 PM3/25/15
to druid-de...@googlegroups.com
I just ran an experiment on real-world data sizes, and the dimension compression seems to be helping out quite a bit.

With dimension compression enabled, segments end up roughly 45-50% smaller than the original segments.

We're talking about roughly 60 million rows of aggregated data split across 15 shards, with 60 dimensions and 20 metrics.
Dimension cardinality is anywhere between 1 and 1 million values, with a median of 40 values, and a 75%ile around 6000.

In the next couple of days I'll send out a proposal to implement dimension compression with initial code to review.
Most of the discussion will probably be around how to make the compression configurable, and not so much on the actual compression implementation, since configurability is not something that we can easily support the current current codebase.

Charles Allen

unread,
Mar 25, 2015, 8:16:47 PM3/25/15
to druid-de...@googlegroups.com
That's awesome Xavier! Out of curiosity, what compression were you using? 
Reply all
Reply to author
Forward
0 new messages