Real-time Ingestion - Thoughts

3,353 views
Skip to first unread message

Fangjin Yang

unread,
Jun 9, 2014, 9:43:23 PM6/9/14
to druid-de...@googlegroups.com

In the early days of Druid, there were standalone real-time nodes, and for given certain volumes of data and availability tolerances, they were sufficient. They pull data from a message queue like Kafka or Rabbit, indexed it locally, and periodically finalized segments for handoff to historical nodes. They are fairly straightforward to scale, simply taking advantage of the innate scalability of the backing message queue. But they are difficult to make highly available with Kafka, the most popular supported message queue, because its high-level consumer doesn’t provide a way to scale out two replicated consumer groups such that each one gets the same data in the same shard. They also become difficult to manage once you have a lot of them, since every machine needs a unique configuration.


Internally, our approach to solving the availability problem was to switch from a pull-based model to a push-based model; rather than Druid indexers pulling data from Kafka, we had another process pull data for them and push it into Druid. Then we switched on replication, since we could now ensure that the same data made it into the same shard in each replica. Our approach to the management problem was the indexing service, where a task-and-resources model replaces a standalone machine model. In addition to simplifying machine configuration, that model also lets us run in the cloud with an elastic number of machines.


If your needs are similar to ours, you can run something similar by pairing the indexing service with Tranquility, a library we wrote to make it easier to create push-oriented Druid indexing tasks and send data to them (https://github.com/metamx/tranquility). If you’d prefer to stick with pull-based, you can use the indexing service in a mode similar to standalone real-time nodes by submitting tasks yourself that use the Kafka firehose.


In either case, the main operational difference between standalone real-time and the indexing service real-time tasks is that a task is not currently able to restore its state after being killed and re-submitted. (Standalone real-time nodes can restore their state from disk after being restarted.) So because of this, we recommend running the indexing service like we do: with push-oriented tasks and a replication factor higher than 1. We have some thoughts how to improve the state of things if you have a different setup. Talk to us if you’re interested in helping :)

Deepak Jain

unread,
Aug 17, 2014, 10:42:35 AM8/17/14
to druid-de...@googlegroups.com
Hi FJ,
Thanks for this post. please take time to answer questions. 
I understand first part of first paragraph. 

1) Why do you say this ? " But they are difficult to make highly available with Kafka, the most popular supported message queue, because its high-level consumer doesn’t provide a way to scale out two replicated consumer groups such that each one gets the same data in the same shard."
2) By high available consumer , do you mean the real time nodes? (because they are the consumer of the kafka queue).
3) If my incoming stream of data is published to one kafka topic and If i have two consumer groups for a given topic then Kafka will sent event e1 to both groups. Hence if all nodes in one consumer group goes down, other takes care of it.  Is it not ?
4) What exactly is shard here ?  How does it relate to realtime configurations ?

Once i understand the problem with realtime nodes, i can see how Tranquility is better.

Going a bit ahead here.
If you pull data from Kafka cluster into "something" and that "something" pushes data to realtime tasks (and not nodes this). 
a) What this this "something" ? Is it a java process and again a kafka consumer ? How did you start, code this ?
b) If this "something" a single node then obviously it can become a single point of failure. I am pretty sure its not.

Please explain in detail as and where possible. 

Regards,
Deepak

Deepak Jain

unread,
Aug 17, 2014, 10:44:00 AM8/17/14
to druid-de...@googlegroups.com
Missed the 0th question.

0) What is "highly available", you mean if one realtime node goes down and other should automatically take over? What happens to the event that was under processing when the first node went down ? Is there not a data loss here ?

Deepak Jain

unread,
Aug 19, 2014, 8:04:19 AM8/19/14
to druid-de...@googlegroups.com
Could some one please respond.

Fangjin Yang

unread,
Aug 19, 2014, 2:49:07 PM8/19/14
to druid-de...@googlegroups.com


On Sunday, August 17, 2014 7:42:35 AM UTC-7, Deepak Jain wrote:
Hi FJ,
Thanks for this post. please take time to answer questions. 
I understand first part of first paragraph. 

1) Why do you say this ? " But they are difficult to make highly available with Kafka, the most popular supported message queue, because its high-level consumer doesn’t provide a way to scale out two replicated consumer groups such that each one gets the same data in the same shard."

Druid replicates segment such that logically equivalent data segments are concurrently hosted on N nodes. If N–1 nodes go down, the data will still be available for querying. On real-time nodes, this process depends on maintaining logically equivalent data segments on each of the N nodes, which is not possible with standard Kafka consumer groups if your Kafka topic requires more than one consumer (because consumers in different consumer groups will split up the data differently).

So for example, let's say your topic is split across Kafka partitions 1, 2, & 3 and you have 2 real-time nodes with linear shard specs 1 & 2. Both of the real-time nodes are in the same consumer group. Real-time node 1 may consume data from partitions 1 & 3, and real-time node 2 may consume data from partition 2. Querying for your data through the broker will yield correct results. 

The problem arises if you want to replicate your data by creating real-time nodes 3 & 4. These new real-time nodes also have linear shard specs 1 & 2, and they will consume data from Kafka using a different consumer group. In this case, real-time node 3 may consume data from partitions 1 & 2, and real-time node 4 may consume data from partition 2. From Druid's perspective, the segments hosted by real-time nodes 1 and 3 are the same, and the data hosted by real-time nodes 2 and 4 are the same, although they are reading from different Kafka partitions. Querying for the data will yield inconsistent results. The fix to this problem requires rewriting the Kafka firehose to use the low level consumer.

Is this always a problem? No. If your data is small enough to fit on a single Kafka partition, you can replicate with issues. Otherwise, you can run real-time nodes without replication. 

Another problem with real-time nodes is that when you need to change your schema, you must update the realtime.spec file and update the real-time node, which can be a tedious process when you have numerous real-time nodes.

For these reasons, we built the indexing service. It manages automatic schema migrations as well as correct replication of data and autoscaling.
 
2) By high available consumer , do you mean the real time nodes? (because they are the consumer of the kafka queue).

Yes. Specifically the logic is in the Kafka firehose.

3) If my incoming stream of data is published to one kafka topic and If i have two consumer groups for a given topic then Kafka will sent event e1 to both groups. Hence if all nodes in one consumer group goes down, other takes care of it.  Is it not ?

See above.
 
4) What exactly is shard here ?  How does it relate to realtime configurations ?

Druid partitions data by time interval first. However, let's say you partition your data by hour but the hourly segments are still too large. Druid can further partition data by high cardinality dimensions or using a hash (single dimension shard spec vs hashed shard spec). The same concept applies to real-time segments. If you have hourly segments but you are generating so much data per hour, you can further shard out data for the hour. 

Once i understand the problem with realtime nodes, i can see how Tranquility is better.

Going a bit ahead here.
If you pull data from Kafka cluster into "something" and that "something" pushes data to realtime tasks (and not nodes this). 
a) What this this "something" ? Is it a java process and again a kafka consumer ? How did you start, code this ?

Others in the forum have created a standalone tranquility server that consumes from Kafka and pushes to the indexing service.

ghy...@gmail.com

unread,
Nov 2, 2015, 2:07:25 AM11/2/15
to Druid Development
I have another question about tranquility:
can I do query immediately after I have ingetsted data by tranquility?
as far as I know, the realtime node has this ability, what about tranquility? would you give a detail explanation ?

在 2014年8月20日星期三 UTC+8上午2:49:07,Fangjin Yang写道:

Gian Merlino

unread,
Nov 2, 2015, 11:23:10 AM11/2/15
to druid-de...@googlegroups.com
Yes, you can also query data loaded with tranquility immediately after sending it. The realtime tasks run by middleManagers at tranquility's behest are sort of little mini-realtime-nodes. They use a lot of the same indexing, querying, and handoff code.

--
You received this message because you are subscribed to the Google Groups "Druid Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-developm...@googlegroups.com.
To post to this group, send email to druid-de...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-development/6edc67b7-2373-4114-bb65-e638ce86c0ed%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Yuval Levav

unread,
Jan 12, 2016, 2:20:56 AM1/12/16
to Druid Development
Hi,
can you please describe the architecture setup in this case ?
what Druid nodes do I need ?
Which node do I query for real time ?
Are there preferences to the amount of data sent/indexed each pull operation?

Thanks,
Yuval 

Anna Savarin

unread,
Feb 3, 2016, 12:08:33 PM2/3/16
to Druid Development
Hi Fangjin,

Thank you for providing a detailed example of how replication can yield inconsistent query results.  Is the same true in case of only 2 RT nodes (same shardSpec partitionNum)?  If both consume from N Kafka partitions, is there still a danger of them having logically inequivalent segments (since they would represent different consumer groups)?

Please advise.  Thank you for your time.

Best,

Anna!

Anna Savarin

unread,
Feb 3, 2016, 3:03:52 PM2/3/16
to Druid Development
Actually, I can answer my own question.  Kafka ordering guarantees fall short when it comes to multiple partitions per consumer, so I think the answer is, "yes, there is a danger."  I imagine this could be handled by using a single firehose, or, well, an indexing service... The question then becomes -- is the indexing service the new SPOF?

Fangjin Yang

unread,
Feb 5, 2016, 8:02:09 PM2/5/16
to Druid Development
Anna, the indexing service is not a SPOF. Druid has no SPOF by design. Think of hte indexing service as more programmatic realtime nodes.

Perhaps the 0.9.0 docs will help explain limitations of RT nodes:

The TL;DR for Kafka high level consumer is that you can't guarantee two Druid shard specs that are supposed to be replicas of each other will read from the same Kafka partition if there are multiple partitions for a datasource and multiple shards for that datasource as well.

Henry Kwan

unread,
Feb 16, 2016, 9:45:32 PM2/16/16
to Druid Development
Does the RabbitMQ firehose have the same limitations as the kafka consumer for standalone real-time nodes?

Fangjin Yang

unread,
Feb 17, 2016, 4:23:34 PM2/17/16
to Druid Development
It'll have the limitations of manual partitioning and management afaik, but possibly not the data incorrectness problems.

Hao lv

unread,
May 10, 2016, 3:12:31 AM5/10/16
to Druid Development
Hi,Fangjin,

I have a little question about your reply.
In the post, you mentioned an example about the incorrect querying result.
node1 consumes partition 1&3, and node 2 consumes partition 2.
node 3 consumes partition 1&2, and node 4 consumes partition 2.
node 3&4 are in the same group, right? How can't they consume partition 3?
Do you mean node 4 consumes partition 3?


Regards,
Hao lv


在 2014年8月20日星期三 UTC+8上午2:49:07,Fangjin Yang写道:

Fangjin Yang

unread,
May 10, 2016, 5:44:24 PM5/10/16
to Druid Development
Hi Hao, if you'll note in the 0.9.0 docs, most of the getting started material now no longer talks about realtime nodes because of their limitations.

Nodes 3&4 are in the same group, but you have no control over node 3 consuming partitions 1&2 and node 4 consuming partition 3 or node 3 consuming partitions 1&3 and node 4 consuming partition 2. The Druid segment exposed by nodes 1 & 3 may be the same, but internally the data can be completely different. During queries, this can lead to very different results.

Danny

unread,
Aug 9, 2017, 6:05:17 PM8/9/17
to Druid Development
Hi Fangjin,
   Thanks for the good info.  When I tried Tranquility with config from 1 partition to:
"properties" : {
"task.partitions" : "2"
}
  and I have 2 middle managers with enough slots available.
  I do not see speed increase as I expected.  Max speed is 50k events/second.
  Documents are not detail enough.  Please answer if my following guess is correct:
1.  If events are partitioned by some kind of mechanism or hashing, different (hashed) events are pumped to 2 different partitions, theoretically, should the speed be twice as fast as one partition?  Unless there is complexity bottleneck spending time partitioning/hashing the data.  
2.  Is there one task handling one partition writing?
3.  There is no detail way to specify how to partition.  Is it some kind of hash?  Howis it done?  Can some dimension be specified to be partitioned on?

Thanks!
Danny

On Monday, June 9, 2014 at 6:43:23 PM UTC-7, Fangjin Yang wrote:

Danny

unread,
Aug 9, 2017, 6:08:46 PM8/9/17
to Druid Development
One more question:
If I want to achieve faster injection speed, e.g.: 500k events/s, can I use several tranquility clients with same config to pump to the partitions to maximize the injection speed?
Reply all
Reply to author
Forward
0 new messages