round robin producer with groupby Kafka streams operation

172 views
Skip to first unread message

Ravi Nayak

unread,
Apr 28, 2017, 3:17:06 PM4/28/17
to Confluent Platform
Hi,
  We have a topic with multiple partitions to which we write using the round robin partitioning strategy. We have a Kafka streams application that consumes this topic and we run multiple instances of this streaming app.
My question is that if we perform a groupByKey operation on this topic in the streams app, will the grouping occur across all the partitions of the topic or only the partitions to which the current copy of the streams app is subscribed to?

Thanks,
Ravi

Ravi Nayak

unread,
Apr 28, 2017, 3:22:16 PM4/28/17
to Confluent Platform
Further to this, after a groupBy operation, do all the records for a given key end up in a single partition (after the re-partitioning that groupBy causes)?

Matthias J. Sax

unread,
Apr 28, 2017, 5:41:04 PM4/28/17
to confluent...@googlegroups.com
It depends :)

A Streams app assumes, that input topics are already partitioned by key.
Thus, if you do

builder.stream().groupByKey().aggregate()

no repartitioning will happen and you aggregation will only group data
if the partitions it consumes (what would be incorrect).

However, if you modify the key before groupByKey(), Streams will
repartition the data across all instances and you will get correctly
partitions data: Example:

builder.stream().selectKey().groupByKey().aggregate().

See the docs for more details:
http://docs.confluent.io/current/streams/developer-guide.html#aggregating

Look for "Marks the stream for data re-partitioning"


-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/42b41852-11f4-4091-978d-247bc69aae82%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/42b41852-11f4-4091-978d-247bc69aae82%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc

Ravi Nayak

unread,
Apr 28, 2017, 5:58:52 PM4/28/17
to Confluent Platform
Thanks Matthias. 

So, are there any benefits to switching our input partitioning strategy to hash based partitioning for this purpose since a re-keying operation will handle this for us anyways? Our concern is that we may have uneven partitions if we do that. On the other hand, a re-partitioning on the fly will create uneven partitions anyways. Any suggestions on this?


On Friday, April 28, 2017 at 3:17:06 PM UTC-4, Ravi Nayak wrote:

Ravi Nayak

unread,
Apr 28, 2017, 6:12:05 PM4/28/17
to Confluent Platform
So, just to clarify, we are artificially mapping the key from a string to a tuple (by breaking the string into 2 parts logically) and then doing a groupByKey. So, it seems to me that a better option may be to do a hash partition upfront and a simple groupByKey upfront. My assumption being that we are paying the cost of uneven partitions during streaming if not upfront.


On Friday, April 28, 2017 at 3:17:06 PM UTC-4, Ravi Nayak wrote:

Matthias J. Sax

unread,
Apr 29, 2017, 2:20:18 PM4/29/17
to confluent...@googlegroups.com
> are there any benefits to switching our input partitioning strategy to hash based partitioning for this purpose since a re-keying operation will handle this for us anyways?

Yes. You save the costly repartitioning step.

> My assumption being that we are paying the cost of uneven partitions during streaming if not upfront.

Yes.

> So, it seems to me that a better option may be to do a hash partition upfront and a simple groupByKey upfront.

Yes to hash partitioning up-front. Not sure what you mean by
"groupByKey" upfront?

Note, if you call `groupByKey` and data is already partitioned by key,
this will be a "no-op" and there will be no overhead with your Streams
application.


-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/288df428-5776-4294-ba60-96094421181d%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/288df428-5776-4294-ba60-96094421181d%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc

Ravi Nayak

unread,
Apr 30, 2017, 11:17:33 PM4/30/17
to Confluent Platform
Great - you have answered all my questions Matthias - thanks.

I accidentally mis-typed the "groupByKey" upfront statement. What I meant to say was "So, it seems to me that a better option may be to do a hash partition upfront and then a groupByKey in the streaming app - which will be a no-op as you indicated"


On Friday, April 28, 2017 at 3:17:06 PM UTC-4, Ravi Nayak wrote:
Reply all
Reply to author
Forward
0 new messages