Kafka Streams - StreamPartitioner API

200 views
Skip to first unread message

Martin Tyler

unread,
Sep 28, 2016, 4:29:25 AM9/28/16
to Confluent Platform
Hi,

I am using the Kafka Streams processor API.

A common pattern so far in my development has been to use the StreamPartitioner you can pass to addSink. This allows me to keep the key I want for compaction purposes, the one that uniquely identifies my message, but partition based on some other criteria.

My StreamPartitioners pick out a different attribute from the message and use that to base the partitioning on (I actually copied a bit of code from inside Kafka so I could use Kafka's default partitioning, but based on a key I pass in, rather than the message key). This is useful when repartitioning a topic for joining on a foreign key. eg

mainkey : { foreignkey, rest-of-message-fields }

So compaction takes place using mainkey, but I can repartition using foreignkey to later join with something else.


Anyway. This all works fine (a bit of a hack to have to copy some internal Kafka code, would be good if that functionality was opened up somehow).


However, I have just started to implement deleting from a compacted topic. This is meant to work by sending 'mainkey: null'. Initially this was fine, but then I had to forward this on through the topology. So I receive a 'mainkey: null' message, lookup something in a KeyValueStore based on that mainkey, then send on a message or messages with 'other-mainkey: null' on another topic.

All good I thought, until I ran it and got an exception in my StreamPartitioner and realised the problem. The StreamPartitioner gets passed (K key, V value, int numPartitions), and until now I had been using an attribute of the 'value' passed, but now of course that is null and all I have to go on is the message key, which of course doesn't contain the info I need, and I don't want to change it to contain that since that would ruin my unique key for compaction.

So, what is the problem here. I think the decoupling of the Processor's process() method and the context.forward() call within, from the addSink's StreamPartitioner means the StreamPartitioner does not have access to all the data that it might need to make a decision.

This would be solved by the ability to pass a partition to context.forward() since this is done within the process() callback, where I have access to things looked up from KeyValueStores.

Is there another solution or does this sound like a good addition to the API?

Regards
Martin

Matthias J. Sax

unread,
Sep 29, 2016, 1:18:34 AM9/29/16
to confluent...@googlegroups.com
FYI:

a similar issues is discussed here:
https://issues.apache.org/jira/browse/KAFKA-3705

-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/f557bdbf-97ca-4001-b3ac-ad95be1df525%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/f557bdbf-97ca-4001-b3ac-ad95be1df525%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc
Reply all
Reply to author
Forward
0 new messages