Kafka Streams creates temporary topics on Kafka broker when using Joins

957 views
Skip to first unread message

Giulio Vito de Musso

unread,
May 8, 2017, 11:08:57 AM5/8/17
to Confluent Platform
Hello you all,

I'm using Kafka Streams and I notived a behaviour which is strange to me and I would like to have some explanations about.
In particular i'm using joins, filters and branches to manipulate one topic. The results are then written to one another topic:

public class KafkaStreamsMainClass {

 
public static void main(final String[] args) throws Exception {
   
[...]
   
KStreamBuilder builder = new KStreamBuilder();
   
KStream<GenericRecord, GenericRecord> sourceStream = builder.stream(prop.getProperty("KAFKA_SOURCE_TOPIC"));
   
String schema = ...;
   
KStream<GenericRecord, GenericRecord> finishedTracesFiltered = sourceStream
      .filter((GenericRecord key, GenericRecord value) -> value.get("endTime") != null)
     
.mapValues((GenericRecord value) -> {
        // some mapping operations
       
return value;
    });
   
KStream<GenericRecord, GenericRecord>[] branchedStreams = sourceStream
     
.filter((GenericRecord key, GenericRecord value) -> value.get("endTime") == null)
     
.branch((GenericRecord key, GenericRecord value) -> value.get("field1") != null,
       
(GenericRecord key, GenericRecord value) -> value.get("field2") != null);
    KStream<GenericRecord, GenericRecord> finishedRequests = finishedTracesFiltered.join(branchedStreams[0],
     
(GenericRecord value1, GenericRecord value2) -> {
       
// some operations
       
return value1;
     
}, JoinWindows.of(TimeUnit.SECONDS.toMillis(2)));
   
KStream<GenericRecord, GenericRecord> finishedJobs = finishedTracesFiltered.join(branchedStreams[1],
     
(GenericRecord value1, GenericRecord value2) -> {
       
// some operations
       
return value1;
     
}, JoinWindows.of(TimeUnit.SECONDS.toMillis(2)));
    finishedRequests
.to(prop.getProperty("KAFKA_TARGET_TOPIC"));
    finishedJobs
.to(prop.getProperty("KAFKA_TARGET_TOPIC"));
   
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
    streams
.start();
   
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  }
}


The data is read from the source topic and writtend to the target topic as expected, but this code also creates some other topics which seems to me toi be auxiliary. Something like

something-KSTREAM-JOINTHIS-0000000009-store-changelog
something
-KSTREAM-JOINTHIS-0000000014-store-changelog
something
-KSTREAM-JOINOTHER-0000000015-store-changelog
something
-KSTREAM-JOINOTHER-0000000010-store-changelog


do you know why are those topics created and by who? They are polluting my Kafka broker so it would be nice if Kafka Streams would not create them.

Thank you


Matthias J. Sax

unread,
May 8, 2017, 12:20:38 PM5/8/17
to confluent...@googlegroups.com
Those topics are created by Streams for fault-tolerance. They are used
to re-create your application state in case a Streams instance crashes.

See
http://docs.confluent.io/current/streams/architecture.html#fault-tolerance

It is possible to "get rid" of them, by specifying a custom store for
your join that has this feature disabled. But it's highly recommend not
to disable state store recovery mechanism! You might get
incorrect/incomplete results if case of failure.

You say, it's "polluting" your cluster. Can you explain in more details
what the exact problem is?


-Matthias
> Runtime.getRuntime().addShutdownHook(newThread(streams::close));
> }
> }
> |
>
>
> The data is read from the source topic and writtend to the target topic
> as expected, but this code also creates some other topics which seems to
> me toi be auxiliary. Something like
>
> |
> something-KSTREAM-JOINTHIS-0000000009-store-changelog
> something-KSTREAM-JOINTHIS-0000000014-store-changelog
> something-KSTREAM-JOINOTHER-0000000015-store-changelog
> something-KSTREAM-JOINOTHER-0000000010-store-changelog
> |
>
>
> do you know why are those topics created and by who? They are polluting
> my Kafka broker so it would be nice if Kafka Streams would not create them.
>
> Thank you
>
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/91dad7ec-cc2f-4a03-8754-6772c291d574%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/91dad7ec-cc2f-4a03-8754-6772c291d574%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc

Giulio Vito de Musso

unread,
May 9, 2017, 2:53:43 AM5/9/17
to Confluent Platform
Hi Matthias,
thanks for the explanation. Considering that I didn't know the reason why those topics were created, by "polluting the broker" I meant to create a lot of topics.
But now that I know that it is done for fault tolerance everything is ok :)

Regards

Matthias J. Sax

unread,
May 9, 2017, 3:54:07 AM5/9/17
to confluent...@googlegroups.com
Glad to hear :)


-Matthias
> > an email to confluent-platf...@googlegroups.com
> <javascript:>
> > <mailto:confluent-platf...@googlegroups.com
> <javascript:>>.
> > To post to this group, send email to confluent...@googlegroups.com
> <javascript:>
> > <mailto:confluent...@googlegroups.com <javascript:>>.
> <https://groups.google.com/d/msgid/confluent-platform/91dad7ec-cc2f-4a03-8754-6772c291d574%40googlegroups.com?utm_medium=email&utm_source=footer
> <https://groups.google.com/d/optout>.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/fa6c4b0f-3e5e-4c61-8dc4-81d660b8d280%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/fa6c4b0f-3e5e-4c61-8dc4-81d660b8d280%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc
Reply all
Reply to author
Forward
0 new messages