Kafka Stream understanding JoinWindow

241 views
Skip to first unread message

Amirreza Soudi

unread,
Jan 23, 2017, 11:56:59 AM1/23/17
to Confluent Platform

I have 2 streams of data and I want to be able to join them for a window of 1 month let's say. When I have a live data everything is fun and super easy with KStream and join. I did something like this;


KStream<String, GenericRecord> stream1 =
            builder.stream(Serdes.String(), new CustomizeAvroSerde<>(this.getSchemaRegistryClient(), this.getKafkaPropsMap()), getKafkaConsumerTopic1());

KStream<String, GenericRecord> stream2 =
            builder.stream(Serdes.String(), new CustomizeAvroSerde<>(this.getSchemaRegistryClient(), this.getKafkaPropsMap()), getKafkaConsumerTopic2());

long joinWindowSizeMs = 30L * 24L * 60L * 60L * 1000L; // 30 days

    KStream<String, GenericRecord> joinStream = stream1.join(stream2,
            new ValueJoiner<GenericRecord, GenericRecord, GenericRecord>() {
                @Override
                public GenericRecord apply(GenericRecord genericRecord, GenericRecord genericRecord2) {
                    final GenericRecord jonnedRecord = new GenericData.Record(jonnedRecordSchema);
                    ....
                    ....
                    ....

                    return jonnedRecord;
                }
            }, JoinWindows.of(joinWindowSizeMs));

The problem appears when I want to do a data replay. let's say I want to re-do these join for the data I have for past 6 months since I am running the pipeline for all data at once kafkaStream will join all the joinable data and it doesn't take the time difference into consideration (which it should only join past one month of data). I am assuming the JoinWindow time is the time we insert data into Kafka topic, am I right?
And how can I change and manipulate this time so I can run my data replay correctly, I mean for re-inserting these past 6 months of data it should take a window of one month for each respective record and join based one that.


you can find the question is SO here http://stackoverflow.com/questions/41811539/kafka-stream-understanding-joinwindow

Matthias J. Sax

unread,
Jan 23, 2017, 1:03:49 PM1/23/17
to confluent...@googlegroups.com
Your question is a duplicate of
http://stackoverflow.com/questions/41707854/how-to-manage-kafka-kstream-to-kstream-windowed-join

If you have further question, please let us know.


-Matthias

On 1/23/17 8:56 AM, Amirreza Soudi wrote:
> I have 2 streams of data and I want to be able to join them for a window
> of 1 month let's say. When I have a live data everything is fun and
> super easy with *KStream* and *join*. I did something like this;
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/0f2f2590-74e3-4a73-bab5-f8189aa6613d%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/0f2f2590-74e3-4a73-bab5-f8189aa6613d%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc

Amirreza Soudi

unread,
Jan 23, 2017, 1:50:04 PM1/23/17
to Confluent Platform

This question is not duplicate ofhttp://stackoverflow.com/questions/41707854/how-to-manage-kafka-kstream-to-kstream-windowed-join?, there I asked about how can I can join based on the window of time. here I am talking about data replay. from my understanding during join Kafka take the time that data is inserted to the topic as the time for JoinWindow, so if you want to do the data replay and re-insert the data for 6 months ago kafka take it as a new data which is inserted today and gonna join it with some other data that is actually for today which it shouldn't.


Matthias J. Sax

unread,
Jan 23, 2017, 8:10:09 PM1/23/17
to confluent...@googlegroups.com
Sorry for that. Miss understood the question.

I added an SO answer.


-Matthias


On 1/23/17 10:50 AM, Amirreza Soudi wrote:
> This question is not duplicate
> ofhttp://stackoverflow.com/questions/41707854/how-to-manage-kafka-kstream-to-kstream-windowed-join?
> <http://stackoverflow.com/questions/41707854/how-to-manage-kafka-kstream-to-kstream-windowed-join>,
> there I asked about how can I can join based on the window of time. here
> I am talking about data replay. from my understanding during join Kafka
> take the time that data is inserted to the topic as the time for
> JoinWindow, so if you want to do the data replay and re-insert the data
> for 6 monthsago kafkatake it as a new data which is inserted today and
> gonna join it with some otherdata that is actually for today which it
> > an email to confluent-platf...@googlegroups.com
> <javascript:>
> > <mailto:confluent-platf...@googlegroups.com
> <javascript:>>.
> > To post to this group, send email to confluent...@googlegroups.com
> <javascript:>
> > <mailto:confluent...@googlegroups.com <javascript:>>.
> <https://groups.google.com/d/msgid/confluent-platform/0f2f2590-74e3-4a73-bab5-f8189aa6613d%40googlegroups.com?utm_medium=email&utm_source=footer
> <https://groups.google.com/d/optout>.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/57ac90fd-32dc-473b-a74c-8924abdb0d0e%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/57ac90fd-32dc-473b-a74c-8924abdb0d0e%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc
Reply all
Reply to author
Forward
0 new messages