Apache Calcite SQL adapter for Druid

1,764 views
Skip to first unread message

Julian Hyde

unread,
Jun 14, 2016, 3:01:15 PM6/14/16
to druid-de...@googlegroups.com
A couple of months ago, I mentioned on this list that I was developing a Druid adapter for Apache Calcite. The initial version of that adapter is complete, and has just been released as part of Calcite 1.8.0.

If anyone would like to kick the tires, the easiest way might be to build from source. Using JDK 1.7 or 1.8, maven 3.2.1 or later, and Druid 0.9.0:

$ git clone https://github.com/apache/calcite.git
$ cd calcite
$ git checkout calcite-1.8.0
$ mvn -DskipTests install
$ ./sqlline
> !connect jdbc:calcite:schemaFactory=org.apache.calcite.adapter.druid.DruidSchemaFactory;schema.url=http://localhost:8082;schema.coordinatorUrl=http://localhost:8081 admin admin
> !tables
+-----------+-------------+------------+
| TABLE_CAT | TABLE_SCHEM | TABLE_NAME |
+-----------+-------------+------------+
| | adhoc | foodmart |
| | adhoc | wikiticker |
| | metadata | COLUMNS |
| | metadata | TABLES |
+-----------+-------------+------------+
> select count(*) from "wikiticker";
+---------------------+
| EXPR$0 |
+---------------------+
| 39244 |
+---------------------+
1 row selected (1.005 seconds)
> explain plan for select count(*) from "wikiticker";
EnumerableInterpreter
DruidQuery(table=[[adhoc, wikiticker]], projects=[[]], groups=[{}], aggs=[[COUNT()]])
1 row selected (0.047 seconds)
> !quit

Change the URLs in your connect string appropriate for your Druid configuration. Calcite should find your data sources automatically, and expose them as tables that you can query.

There are also pre-built jars in Maven central:

<dependencies>
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-druid</artifactId>
<version>1.8.0</version>
</dependency>
</dependencies>

Please give it a try. There is more online documentation[1]. If you find bugs, log a JIRA case [2] or post to Calcite's dev list[3].

Julian

[1] http://calcite.apache.org/docs/druid_adapter.html

[2] https://issues.apache.org/jira/browse/CALCITE

[3] https://mail-archives.apache.org/mod_mbox/calcite-dev/

Xavier Léauté

unread,
Jun 14, 2016, 4:12:28 PM6/14/16
to druid-de...@googlegroups.com
Looking pretty sweet! Excited to see what comes out of this.


--
You received this message because you are subscribed to the Google Groups "Druid Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-developm...@googlegroups.com.
To post to this group, send email to druid-de...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-development/E5C3B836-DA86-4A6E-8E94-89089F899620%40apache.org.
For more options, visit https://groups.google.com/d/optout.

Kurt Young

unread,
Jun 15, 2016, 9:27:05 PM6/15/16
to Druid Development, jh...@apache.org
Really exciting to see this. BTW, we have developed a druid plugin based on drill which also used calcite as its core sql engine, may be we could share some experiences.

Julian Hyde

unread,
Jun 16, 2016, 8:50:48 PM6/16/16
to druid-de...@googlegroups.com
Using Drill as a runtime engine makes a lot of sense. It is able to go things like shuffle-join.

Most of my efforts went into the planner rules, ensuring that we push as much down to Druid as possible. If we can pool resources on that part of the problem, that would be great. 

Who is “we”, by the way?

Julian

--
You received this message because you are subscribed to the Google Groups "Druid Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-developm...@googlegroups.com.
To post to this group, send email to druid-de...@googlegroups.com.

Kurt Young

unread,
Jun 16, 2016, 11:43:36 PM6/16/16
to Druid Development, jh...@apache.org
We are a team from Alibaba search group :)

Besides from pushing as mush down to Druid as possible, we also let Drill talks to Druid's historical node directly to remove the bottleneck of the Druid's broker. In next step, we will try to let Druid return "ValueVector" instead of json to reduce the costs of ser/deser. And we will try to push these efforts back to Drill's community soon. Hope we can join resources start from there. 

Kurt

minfeng xu

unread,
Jun 20, 2016, 12:57:01 AM6/20/16
to Druid Development, jh...@apache.org
One question, In your solution, you mentioned "let Drill talks to Druid's historical node directly", so what about stream data in peon or realtime-node? 
And please teach me, why drill faster than druid's broker? I think the bottle-neck can be removed by adding more brokers. 

Kurt Young

unread,
Jun 20, 2016, 9:22:43 PM6/20/16
to Druid Development, jh...@apache.org
In current version, only batch ingested data are considered, we will try to figure out how to deal with realtime data later. But the main approach will be the same, find server and get segment timeline from zk and talk to them directly. With most simple query which druid capable of, broker's performance is a little better than drill + druid, this means druid are doing some really great work :)  

But in some other cases which involves more complicated query like huge groupby (millions), join, window function, this can be done only by drill + druid.

When i say the bottleneck of broker, i mean it within one query. Druid's broker acts as a "merger" node in query execution, and all the merge work will be done in one worker (mostly one thread). 

Gian Merlino

unread,
Jul 6, 2016, 3:05:02 AM7/6/16
to druid-de...@googlegroups.com, jh...@apache.org
Kurt, Julian, with the Calcite-to-Druid and Drill-to-Druid stuff you're working on, are you mostly using "groupBy" and "select" queries? How have you found their performance to be? Recently I've been thinking about what we can do to improve those two, since they seem useful for SQL adapters, which a lot of people seem to be working on right now :)

For "select" I was wondering about a query that works a little differently, perhaps doing dictionary lookups for blocks of rows rather than for each row and using a different serialization format, like ValueVectors or something similar. We could also save the needless utf8 decode/re-encode by sending strings as they exist in the segment. We could also raise the usable threshold (even to unlimited) by streaming segment data directly through the jetty thread rather than using the processing pool – although I'm not as sure about this one, since it would make it tougher to control cpu use.

For "groupBy" I did some work recently to speed that up (the "v2" strategy) but I think there's more we could do, like pushing down limits when possible, and using more compact keys or direct array accesses when dimension cardinality is low.

Gian

Julian Hyde

unread,
Jul 7, 2016, 10:31:32 PM7/7/16
to druid-de...@googlegroups.com
Yes, mainly “groupBy” and “select”; also “topN”. I haven’t done any performance tests yet, but my gut says that there will be two ways to improve performance.

1. Use a more compact file format. This will save network IO but, more importantly, reduce GC pressure on both the Druid side and the consumer side. Even better, allow the consumer to choose its own format, so that the consumer doesn’t need to transcribe when the data arrives. Apache Arrow format (the successor to ValueVectors) would be the preferred format for Apache Drill and quite a few other engines.

2. Return the results in parallel to a distributed compute engine such as Apache Hive, Drill or Spark; in general there would be N historical nodes writing to M consumers. I can imagine that this would get quite messy, because what is returned might be partially aggregated rows that need to be routed to other rows with the same key, merged and finalized into totals, maintaining as much parallelism as possible. In other words, do what the broker does, but inside the consumer. Each engine will need to solve this problem, hopefully without copy-pasting code from the broker.

Both points are worth doing, but parallelism will yield bigger returns.

Julian


Gian Merlino

unread,
Jul 8, 2016, 12:08:48 AM7/8/16
to druid-de...@googlegroups.com
For #1 by "file format" do you mean mean the serialization format for the data transferred over the wire? Arrow sounds good for that.

I believe Kurt's team is already doing #2 based on him saying "we also let Drill talks to Druid's historical node directly". I'm not sure if they are using code from the Druid broker to do that or not. Sparkline's SparkSQL stuff (https://github.com/SparklineData/spark-druid-olap) is similar, it talks directly to historical nodes from Spark executors. Last I checked, they're not using Druid code, they have their own merging stuff.

Gian

丁凯剑

unread,
Jul 8, 2016, 2:15:26 AM7/8/16
to Druid Development
I'm from kurt's team.


We let drill talk to each historical which is need by the query.
We start an simplified version of guice client (like CliBroker) to get the segment<->historical info from zookeeper (reuse druid code, like SegmentServerView), this client is very light weight and we can get segments by serverView.getTimeline(dataSource)


for the #2, we let drill do the merge(which is in parallel), drill is acting as a more powerful druid broker group.
Drill is doing the merge at soon as part of the data is coming back from each historical, so we created the streaming version of druid query type to return data as early as possible(as small batch, like 20000 records each time), like streaming-groupby-query, streaming-dimension-query(streaming version of search query), ScanQuery(streaming version of select query)
In such way, we doesn't hit the up limit of each query, like 50w in select/groupby/search query.

So for the #1, it will be awesome if we can use apache arrow to optimize the small batch to reduce the ser/der cost!!

Kurt Young

unread,
Jul 10, 2016, 9:36:29 PM7/10/16
to Druid Development, jh...@apache.org
Hi Gian,

Sorry for the late response, i'm on vacation during the last few days.

We had implemented some plans for Drill to make all Druid's queries working in the right situation, this not only includes "select" and "groupBy" query, but also "TimeSeries", "TopN" and "Search". From our experiences, there are mainly 3 potential issues that can improve performance. 

#1 is about "select" and "groupBy" query, you're right, these indeed have performance problems, it's not about how they "implemented", but how they "worked". All these two queries needs to materialize the whole result before they return, so we have limits for "select" and default 500,000 limit for "groupBy". The first thing we have done is to extend these two queries to give a "streaming" version. For "streaming select", it acts just like "scan" on HBase, it does not have limits or page identifies, any row passed the filter will returned in small batches. For "streaming groupBy", it works just like Druid's current implementation, but returned for each "maxIntermediateRows". It will be Drill's job to merge all the small results, and the whole execution is parallelized from outside's view. For now, we can use "select" query on a datasource contains billions of rows, and we can do groupBy query whose result size are about tens of millions. 

#2 is about the return format of Druid, which is json/smile-json now. We found there is lots of cpu costed by json's ser/dser, so the straight idea is to provide a third format for Druid, which i think ValueVector or Arrow is great. We haven't implemented this yet, but i did a simple benchmark for this, it shows there will be about 30% performance boost for this. 

#3 is about vectorized execution. Since we want Druid to return a small batch of the results each time to make whole execution parallelize, it's will make lots of sense to let Druid do this in a vectorized mode. We also have not implemented this, because i think @xavier is working on this now. 

For #1, we may consider to fire a PR to let others share the work, and for #2 and #3, we're glad to help if you guys think they are resonable. 

Thanks
Kurt

Julian Hyde

unread,
Jul 11, 2016, 12:07:52 PM7/11/16
to druid-de...@googlegroups.com
I agree with what Kurt says, and furthermore I’ll point out that #2 and #3 are linked. If you have an inefficient return format such as JSON, the producer (Druid) and consumer (say Drill, Hive or Spark) will both incur extra GC costs and extra bus waits that may not even show up as CPU time in a simple benchmark. If you have a good in-memory format such as Arrow the CPU can jump into vectorized operation the moment the data comes out of the network, without the considerable friction involved in copying and transcribing the data.

Gian Merlino

unread,
Jul 12, 2016, 2:33:01 PM7/12/16
to druid-de...@googlegroups.com
Streaming select and groupBy both sound really useful (I could still see a valid use case for having an optional threshold though – maybe you want to stream the first/last 100k rows or something). If you guys wrapped that part up into a PR then that'd be awesome. Especially if they are flags to the existing queries rather than new query types.

Arrow as a wire format also sounds good to look into. Kurt, was the 30% boost you noticed just from using it as a serialization format, or did you make more sweeping changes? We could also take a look at using Arrow for transferring results within and out of the query engines. e.g. for the groupBy query, using Arrow batches instead of sequences of MapBasedRows.

Gian

Nishant Bangarwa

unread,
Jul 12, 2016, 3:02:26 PM7/12/16
to druid-de...@googlegroups.com
Agree with gian, Streaming select and groupby queries would be super useful. 

@Kurt, When Drill talks to druid historical nodes, does it reuses parts of existing druid broker code or you ended up writing most of it again ? 
fwiw, With diff query engines like drill, spark etc integrating with druid, It would be really nice to have a library which understands druid result formats and can be used by query engines for figuring out segment locations and querying historical nodes directly.

Kurt Young

unread,
Jul 14, 2016, 1:48:22 AM7/14/16
to Druid Development
Hi Gian,

I didn't do some serious performance test, but only a small demo. I tested with TimeSeries query in a unit test, so it didn't involve broker or network. I compared with these two situation:
a. open index + query + format result with json + deserialize from json + wrap it into ValueVector
b. open index + queyy + format with ValueVector

Kurt Young

unread,
Jul 14, 2016, 1:58:21 AM7/14/16
to Druid Development
Most of the data structures are reused from Druid, like Query, Filter. And we also copied some codes from SegmentServerView and ServerInvectoryView to make it just enough to work since these codes involves lots of guice things. 

Eric Tschetter

unread,
Jul 15, 2016, 12:18:11 AM7/15/16
to druid-de...@googlegroups.com
I just want to say, I think everything in this thread is awesome!  Thank you everybody for spending the time and energy!
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-development/2044d59d-db93-443e-81e8-fd94fa9ec3c0%40googlegroups.com.

Julian Hyde

unread,
Jul 15, 2016, 2:54:40 PM7/15/16
to druid-de...@googlegroups.com

> On Jul 14, 2016, at 9:18 PM, Eric Tschetter <eche...@gmail.com> wrote:
>
> I just want to say, I think everything in this thread is awesome! Thank you everybody for spending the time and energy!

+1 (feeling the love)


Gian Merlino

unread,
Jul 25, 2016, 8:43:53 PM7/25/16
to druid-de...@googlegroups.com, jh...@apache.org
Hey Kurt,

Any chance you could raise a PR for any streaming 'select' work you've already done? It'd be great to start improving the select query!

Gian

丁凯剑

unread,
Jul 27, 2016, 9:51:10 PM7/27/16
to Druid Development, jh...@apache.org
Will raise a PR this week about our work on streaming select, but it is not fully tested from broker side. @Gian

在 2016年7月26日星期二 UTC+8上午8:43:53,Gian Merlino写道:

Gian Merlino

unread,
Jul 28, 2016, 1:40:26 PM7/28/16
to druid-de...@googlegroups.com, jh...@apache.org
Great, sounds awesome. Looking forward to it!

Gian

丁凯剑

unread,
Aug 1, 2016, 1:54:11 PM8/1/16
to Druid Development, jh...@apache.org
https://github.com/druid-io/druid/pull/3307 @Gian
Though it can work by talking to historicals, still have more work to do to make it useable from broker side.

在 2016年7月29日星期五 UTC+8上午1:40:26,Gian Merlino写道:
Message has been deleted

Deepya Maddi

unread,
Feb 12, 2018, 8:50:16 PM2/12/18
to Druid Development
Followed the below steps to setup calcite. Making my hands dirty by running some simple queries on "druid-wiki-model".  I can only run "explain plan for" and "describe"  queries. When tried to run "select" query, I see "Error while  executing druid request". Is there anything I am missing?

Gian Merlino

unread,
Feb 13, 2018, 10:59:34 AM2/13/18
to druid-de...@googlegroups.com
Does Calcite tell you the specific error it got?

Gian

--
You received this message because you are subscribed to the Google Groups "Druid Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-development+unsubscribe@googlegroups.com.
To post to this group, send email to druid-development@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-development/eb76fd19-f871-482d-8146-bd4d89240491%40googlegroups.com.

林学维

unread,
Mar 19, 2018, 9:54:39 AM3/19/18
to Druid Development
how is calcite reconized intervals from SQL. 
I've try following sql but can't be set in the intervals json property

where '__time' > TIMESTAMP '2018-03-18 00:00:00' and '__time' < TIMESTAMP '2018-03-19 00:00:00'

and there are no further documentation for that. can you help?
Reply all
Reply to author
Forward
0 new messages