Does tranquility need Kafka ?

863 views
Skip to first unread message

Preetam Rao

unread,
Jul 9, 2014, 1:40:31 AM7/9/14
to druid-de...@googlegroups.com
Hi,

Had some basic questions.

1. Yet to get tranquility to work. But from the discussions in the list, I assume we can directly ingest events from Storm into Druid via Tranquility without needing anything in-between, including Kafka. Is that correct ?

2. What is the real time ingestion performance using Tranquility (or even just stand alone real time ingestion) ? Could you please share some sample numbers from your prod if you did not mind? We are expecting an incoming traffic about 200K RPS each containing data one some user actions. For now assume about 10-15 string fields. We are trying to figure a) if a queue is needed in between (kafka/redis etc) to buffer the perf differences b) whether we need pre time based aggregation at app ingestion level to reduce ingestion load on druid

3. Could you please elaborate more on the issue with Kafka and high availability ?
a) Is its correct that only way to make "real time" nodes highly available is by ingesting identical data to "all" them at the app level ? Is it the case that only historical nodes replicate segments for high availability and not real time nodes ?
b) What is the issue with Kafka ? Is it that multiple consumer groups would need to be created and we have to explicitly name the consumer groups in each host's kafka firehose config, thus making it less manageable ? But if we were ok and had the infrastructure to easily set per host settings via some console, then would you consider it still trouble some ?

4. Is there any performance difference between EventReceiverFirehostFactory & Tranquility ? If my storm Bolt made HTTP Rest calls on the Druid instead of using Tranquility, would we gain anything (because that looks simpler) ?

Apologies if I am missing the basics because I have not really play around with neither kafka nor Druid/Kafka integration. But was able to play around with real time nodes via EventReceiverFirehostFactory.

Appreciate your thoughts since this will help us move in the right direction without spending too much time experimenting.

Thanks
Preetam




Gian Merlino

unread,
Jul 9, 2014, 8:28:59 AM7/9/14
to druid-de...@googlegroups.com
1- Yes, tranquility writes events directly to druid without an external buffer like kafka. It uses in-heap buffers at each end (your end, and druid's).

2- 200K/sec isn't a problem in general, though you will probably need to partition your data across a few peons. I think the usual guidance is something like 5–50K/sec per peon depending on data complexity. Pre-aggregating data at the app always helps if possible.

3- The issue with kafka -> druid is that if you need to partition your dataset across multiple consumers, then you have two choices. You can have one consumer group, in which case everything works OK, but if you lose a realtime node then you can't query data that has not been handed off yet (temporarily, if you recover the node later, or permanently, if you don't). Or you can have two consumer groups, both ingesting the same data, which gives you better availability but has a serious problem. Druid segments all have partition numbers, and the broker assumes that two segments with the same partition number will have the same data. It will happily mix together results from segment 0 in consumer group A with segment 1 in consumer group B. Those segments will likely not be consistent across consumer groups, so your queries will not be consistent either. Tranquility's approach is to push data into druid rather than having druid pull it, which means tranquility can ensure that multiple replicas of the same partition number actually have the same data.

4- Tranquility actually uses the event receiver firehose, so there won't be any performance difference, 'cause it's the same thing. The main thing tranquility buys you is automatic management of indexing service tasks and client-side load balancing. You just tell it what your schema is and how many partitions and replicas you want, and it will take care of getting the right data into the right peons. If you talk to Druid directly, you'll have to manage that sort of stuff yourself.

Preetam Rao

unread,
Jul 9, 2014, 10:32:14 AM7/9/14
to druid-de...@googlegroups.com
Thanks a lot Gian for your detailed answer. Really helps. Comments inline.


On Wednesday, 9 July 2014 17:58:59 UTC+5:30, Gian Merlino wrote:
1- Yes, tranquility writes events directly to druid without an external buffer like kafka. It uses in-heap buffers at each end (your end, and druid's).
 
2- 200K/sec isn't a problem in general, though you will probably need to partition your data across a few peons. I think the usual guidance is something like 5–50K/sec per peon depending on data complexity. Pre-aggregating data at the app always helps if possible.

3- The issue with kafka -> druid is that if you need to partition your dataset across multiple consumers, then you have two choices. You can have one consumer group, in which case everything works OK, but if you lose a realtime node then you can't query data that has not been handed off yet (temporarily, if you recover the node later, or permanently, if you don't). Or you can have two consumer groups, both ingesting the same data, which gives you better availability but has a serious problem. Druid segments all have partition numbers, and the broker assumes that two segments with the same partition number will have the same data. It will happily mix together results from segment 0 in consumer group A with segment 1 in consumer group B. Those segments will likely not be consistent across consumer groups, so your queries will not be consistent either. Tranquility's approach is to push data into druid rather than having druid pull it, which means tranquility can ensure that multiple replicas of the same partition number actually have the same data.

Thanks a lot for the details. So, the fact that, for two consumer groups receiving same set of events, over same time period, Kafka does does not guarantee total ordering when more than one partition is present,leads two segments with same partition number but potentially different data. And hence not reliable. So Tranquility fixes it by pushing events in same order to the multiple druid nodes. Is my understanding correct ?
  
4- Tranquility actually uses the event receiver firehose, so there won't be any performance difference, 'cause it's the same thing. The main thing tranquility buys you is automatic management of indexing service tasks and client-side load balancing. You just tell it what your schema is and how many partitions and replicas you want, and it will take care of getting the right data into the right peons. If you talk to Druid directly, you'll have to manage that sort of stuff yourself.

Again, thanks a lot for details. Couple of more related queries:
a) event receiver firehose uses real time nodes. Tranquility relies on indexing service (with all its overlard/middle manager/peon.)  Isn't having one or more real time nodes much simpler than setting up indexer service ? Why one is better than another ?  
b) While based on your description, I prefer tranquility now, just to understand the nuts & bolts, where do we specify partitioning & replication strategies while not using traquility ? Can you please point me to the documentation for replication/partitioning at real time nodes as well as historical nodes ?

Thanks 
Preetam  

Gian Merlino

unread,
Jul 9, 2014, 6:52:45 PM7/9/14
to druid-de...@googlegroups.com
The multiple-consumer-group problem is less about ordering and more about which kafka partitions end up being read into which druid partitions. You will have a problem if druid partition 1 in consumer group A is reading data from kafka partitions 1 & 2, but druid partition 1 in consumer group B is reading from kafka partitions 2 & 3. The broker will assume that both instances of partition 1 have the same data, but that won't be true.

The main advantage of the indexing service is programmatic allotment of resources. With standalone realtime nodes, each node must have a unique configuration, set up in a particular way such that they work with each other correctly (you need to manually adjust the shardSpecs and firehose configs). With the indexing service, you can submit these configurations to the overlord as tasks and the overlord will find somewhere to run them. With indexing service + tranquility, you don't even have to submit the tasks yourself, since tranquility takes care of it.

Partitioning and replication is mostly about choosing the right shardSpecs for the various nodes or tasks you set up. Tranquility does partitioning by submitting multiple tasks with "linear" shardSpecs that have different partitionNums. Then it spreads out which tasks it sends its events to. This works because at query time, the broker will merge results from all unique shardSpecs currently existing in the cluster. To do replication, it sets up multiple tasks with the same shardSpecs (so the broker will only query one of them at a time) and makes sure to send them the same data. If you wanted to do partitioning and replication without tranquility, you'd do something similar.

Preetam Rao

unread,
Jul 10, 2014, 1:39:38 AM7/10/14
to druid-de...@googlegroups.com
Appreciate you patience and the details. Really helps my understanding. Most of it makes sense now. Couple of follow questions (almost there):


>> The multiple-consumer-group problem is less about ordering and more about which kafka partitions end up being read into which druid partitions.

Great. Now it makes sense.

Q. Shardspec tells the druid partition the node is serving and firehose consumer group & feed  tells the kafka topic being received. Only thing i could not connect is where do we specify the "parallelism" of the kafka consumer group ? Is it that each node just runs one thread inside that consumer group ? So, if we have 3 nodes (or tasks) each with a different partition number in its shard spec, but same consumer group and same feed,  does that mean effectively we are talking about a kafka consumer group for this topic with parallelism 3.


>>With the indexing service, you can submit these configurations to the overlord as tasks and the overlord will find somewhere to run them

I think now I have clarity on most of the things you clarified, that, roughly, there will a set of shard specs (tasks) with distinct partition numbers that are used for partitioning and another set or more of shard specs (tasks) that are duplicates of the first set, and that broker will have the intelligence to differentiate among replicas of a partition while querying and tranquility will have the intelligence to send data to all replicas of a given partition instead of just sending to one of them.

Q. So, with tranquility,  each task will correspond to one "real time index task" as in http://druid.io/docs/latest/Tasks.html with firehose as "receiver" and each task would work with one partition (or shard) of the data source. Each of these tasks will run on a peon listening on a port to receive json payload via the specified receiver rest end point. So essentially these "peon" workers are what will be ingesting & serving data and need to be configured accordingly via middle managers to have correct heap capacity.  Right ? If multiple such tasks end up running on same host, is the receiver rest end point changed accordingly with some suffix to avoid port/url conflicts ?


Thanks
Preetam

Gian Merlino

unread,
Jul 10, 2014, 11:14:06 AM7/10/14
to druid-de...@googlegroups.com
Your understanding about kafka consumer group parallelism is correct. Each druid realtime node or task runs a single ingestion thread, and the kafka high level consumer will balance kafka partitions roughly evenly amongst consumers in the same kafka consumer group.

If multiple peons (there is one task per peon) end up running on the same middle manager (there is typically one middle manager per actual host) then each peon will get its own port. Tranquility finds them by using service discovery. I think you have to set druid.indexer.task.chathandler.type=announce for this to work, since I don't think that's enabled by default in current druid builds. That setting tells tasks to announce themselves using curator service discovery.

Preetam Rao

unread,
Jul 10, 2014, 1:07:49 PM7/10/14
to druid-de...@googlegroups.com

Thanks a lot Gian for your help in helping me understand druid real time ingestion with kafka vs tranquility vs receiver. Much appreciated. Will close this thread for now since most of it is clear to me now. If any new questions, when trying out tranquility, will start a new topic.

Gian Merlino

unread,
Jul 10, 2014, 1:13:33 PM7/10/14
to druid-de...@googlegroups.com
Glad to help! Best of luck with the rest of your evaluation. Feel free to post another topic if you have more questions.

Mahesh Venkat

unread,
Oct 26, 2014, 4:12:29 PM10/26/14
to druid-de...@googlegroups.com
Absolutely great thread to understand Kafka pull model versus Tranquility push model.
It will be wonderful if there is sequence diagram that can explain the Tranquility push model.  
Is Tranquility sending multiple htttp "REST" calls to each replicated Overlord instance ? 
How does replication occurs across multiple clusters of Overlord. Do they share the same ZK task lists, middle managers and peons ?
Usually async pub/sub messaging is suppose to have higher throughput over sync Http calls. For every event from 
my app if am opening http calls through Tranquility to Druid (i.e Overlord) , isn't this reducing my injestion throughput  ?

BTW, in the docs  http://druid.io/docs/latest/Tasks.html , under real time index service, 
what is correct value of "firehose": { "type" ..  ?
In the docs it says kafka-0.7..2.  Should we replace it with "receiver".

 Thanks
--Mahesh 

Gian Merlino

unread,
Oct 26, 2014, 9:20:00 PM10/26/14
to druid-de...@googlegroups.com
Tranquility supports async http posts as well as batching. If you are using the bundled storm bolt, this happens without you having to do anything special. If you are using the tranquility api directly, you can either use a Beam and make your own batches, or you can use a BeamPacketizer and let tranquility make the batches for you. There should be a batch size for your dataset that maximizes throughput- probably a few hundred to a few thousand messages per batch depending on their size.

There isn't a flow diagram right now but the basic idea is that you connect up tranquility to a single overlord (or a HA cluster of overlords) and it will create one druid realtime task for each partition-replicant. Every time tranquility sends a batch of data, it splits it up so that some messages go to each partition, and a copy is sent to all replicants of that partition. All instances of tranquility for the same datasource will share the same tasks, middle managers, and peons. They coordinate this through zookeeper.

If you're using tranquility, you won't actually be setting up realtime tasks yourself- it does this on its own. It will create them with receiver firehoses, which you can see if you inspect the task payloads after they are created.

Gian Merlino

unread,
Oct 26, 2014, 9:23:01 PM10/26/14
to druid-de...@googlegroups.com
Btw, it does still make sense to use tranquility with kafka. We do this in production: instead of druid reading from kafka directly, we have a program reading from kafka and writing to druid using tranquility. This is mostly because it makes it easier to do replicated tasks and zero-downtime schema changes. The reason we keep kafka in the loop is that it provides a valuable buffer between the things generating events (usually your app) and the thing that has to know about druid and communicate with it.

Mahesh Venkat

unread,
Nov 8, 2014, 11:40:56 PM11/8/14
to druid-de...@googlegroups.com
When you use Kafka with a Kafka consumer as a Tranquility client,  what will be the Kafka configuration for these Tranquility clients? 
is it two or more Kafka consumer groups on the same topic, with multiple consumers in each consumer group reading different Kafka partitions ? 
Essentially two Kafka consumer groups reading the same events will lead to concurrent Tranquility clients injecting  the same events into Druid.  I am assuming that Tranquility/Indexing Service/Overlord will handle the dedupe of events ?  Is this an accurate conclusion ?

--Mahesh

Gian Merlino

unread,
Nov 10, 2014, 6:00:08 PM11/10/14
to druid-de...@googlegroups.com
You'd want to set up a single Kafka consumer group across all of the Tranquility clients. They'd then split up the partitions evenly.
Reply all
Reply to author
Forward
0 new messages