[Kafka Streams] Detect absence of join after a time period

698 views
Skip to first unread message

Nicolas Colomer

unread,
Aug 24, 2016, 7:04:49 AM8/24/16
to Confluent Platform
Hello there,

I'm trying to join 2 streams of display and click records using Kafka Streams. I want to detect both that a join occurred in a time window... or not!

No problem for the first use case (pseudo code):

clickStream
  .join(displayStream, JoinWindows.of("occurred-30m-before").before(30 minutes))
  .to("clicked-display-topic")

For the second use case (ie. no join detection), leftJoin from the high level API looks to be a good candidate since:
- I wan't the first stream (displays) drive the output
- detect that no join with the second stream (clicks) occurred...
- after a given period of time / sliding time window (where display is the reference)

Some pseudo-code of what I'd like to achieve:

displayStream
  .leftJoin(clickStream, JoinWindows.of("occurred-30m-after").after(30 minutes))
  .filter({ case (id, (display, click)) => click == null })
  .to("not-clicked-display-topic")

But I'm unable to make it running. This stream always output displays (with null click value) into not-clicked-display-topic as soon as they arrive, without waiting the 30 minutes window. Thus, even if a click appears short time after (in the join window), we already got the (wrong) info that no click occurred for this display.

Is this use case implementable with Kafka Streams?

Nicolas

(this topic, with a more precise question, originated from this one)

Michael Noll

unread,
Aug 24, 2016, 1:37:02 PM8/24/16
to confluent...@googlegroups.com
Nicolas,

a question:  are you doing stream-stream, stream-table, and/or table-table joins?


--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/04a1bcee-f697-40f5-92ef-78b5f3e9d179%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Michael G. Noll
Product Manager | Confluent
Follow us: Twitter | Blog

Nicolas Colomer

unread,
Aug 24, 2016, 2:42:48 PM8/24/16
to Confluent Platform

Hi Michael,

I try to achieve stream-stream join due to its sliding time window nature.

Nicolas

To post to this group, send email to confluent...@googlegroups.com.

Matthias J. Sax

unread,
Sep 1, 2016, 1:13:43 PM9/1/16
to confluent...@googlegroups.com
Hi Nicolas,

sorry for the late reply...

The problem you are facing is related to Kafka Streams left- (and
outer-)window-join semantics. It's not SQL...

If we have a left-join and we get the first input from for the left
side, the window of the right input is empty, and we just emit
<key:join(v-left, null)> because we do not know if there will be any
input for the right side later on (if we do not emit, we might miss this
result).

However, if we later get an input for the right input, we also emit
<key:join(v-left,v-right)> what is the correct result -- the previous
result record is "wrong" with respect to SQL semantics. But we cannot
undo it. We do this, because if we start to delay this first output to
see if we get an actual join result or not later on, we open Pandora's
Box of what-if cases... (also with respect to late arriving, ie,
out-of-order, records).

One thing you could do, is to make sure (would be best effort only) that
the right input in a left join is processed first, ie, windows are fully
populated, before left input is processed. You could try to manipulate
timestamps to achieve this. This would at least minimize the probability
to get "wrong" <key:join(v-left:null)> record in the output.


-Matthias
> <https://groups.google.com/forum/#!topic/confluent-platform/rn8CJu7Wfcw>)
>
> --
> You received this message because you are subscribed to the
> Google Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from
> it, send an email to
> confluent-platf...@googlegroups.com <javascript:>.
> To post to this group, send email to
> confluent...@googlegroups.com <javascript:>.
> <https://groups.google.com/d/msgid/confluent-platform/04a1bcee-f697-40f5-92ef-78b5f3e9d179%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout
> <https://groups.google.com/d/optout>.
>
>
>
>
> --
> *Michael G. Noll*
> Product Manager | Confluent
> +1 650 453 5860 | @miguno <https://twitter.com/miguno>
> Follow us: Twitter <https://twitter.com/ConfluentInc> | Blog
> <http://www.confluent.io/blog>
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/1a78056b-8981-4592-816d-ac307432ab2c%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/1a78056b-8981-4592-816d-ac307432ab2c%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc
Reply all
Reply to author
Forward
0 new messages