Process a Kafka topic with a delay using Kafka Stream

11,037 views
Skip to first unread message

Nicolas Colomer

unread,
Aug 17, 2016, 4:37:37 PM8/17/16
to Confluent Platform
Hi there,

I was wondering how to achieve such a use case with Kafka Stream: we have a topic containing records that we'd like to process after say 48h delay, so that after this delay we can lookup in whatever database and generate new business facts.

We currently have similar jobs implemented using Samza. We achieved to consume Kafka topics with a delay using a custom MessageChooser that polls the message and accept it or loop while timestamp is contained in a sliding time window.

Is it at least possible?

Thanks for your thoughts and guidances,
Nicolas

Michael Noll

unread,
Aug 18, 2016, 2:27:48 AM8/18/16
to confluent...@googlegroups.com
Nicolas,

yes, this is possible using the Processor API.  See my answer on Stack Overflow:

Alternatively, you could also consider starting/stopping your Kafka Streams applications to achieve 48h intervals.

That said, perhaps there's a better approach to your problem but this might depend on e.g. why exactly you need to process after a 48h delay (is that because the lookup data arrives two days later? or....?)

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/924ea89c-b0a3-4c42-b299-d89a1489c955%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Michael G. Noll
Product Manager | Confluent
Follow us: Twitter | Blog

Nicolas Colomer

unread,
Aug 18, 2016, 3:38:10 AM8/18/16
to Confluent Platform
Hi Michael and thanks for your answer!

Let me elaborate further on our use case:

We'd like to observe events coming from some provider. We have no guarantee on their arrival time, we just know that it mights occur between a whatever-unique event emission and 48h window after this event was emitted, or not at all (ie. no event is an information!).

What we did to solve this problem using Samza was to consume this stream of provider events but with 48h delay, ie. accumulating events into kafka and waiting for the topic's head to be 48h delayed from now (we can assume messages are correctly ordered by time so the head is always the most recent event). We still need to consume those events (that might come in high rate) as a stream to get feedback asap. Thus starting/stopping the application would come down to do kind of batch/microbatch that we could do otherwise more efficiently.

Thinking a bit more to the problem and looking at the high-level Kafka Streams DSL documentation, I found that the following KStream method might be our solution:
KStream.leftJoin(KStream, ValueJoiner, JoinWindows) using JoinWindows.after(long timeDifference) (or within(long timeDifference))

Indeed, the thing we try to do is a temporal left join, allowing us to emit message even if no join occurred. But I'm curious to know how this join is achieved underhood: does messages accumulates in memory (potentially accumulating gigabytes of data) or does the API waits the topics' head to be delayed according the chosen JoinWindow?

Nicolas

Le jeudi 18 août 2016 08:27:48 UTC+2, Michael Noll a écrit :
Nicolas,

yes, this is possible using the Processor API.  See my answer on Stack Overflow:

Alternatively, you could also consider starting/stopping your Kafka Streams applications to achieve 48h intervals.

That said, perhaps there's a better approach to your problem but this might depend on e.g. why exactly you need to process after a 48h delay (is that because the lookup data arrives two days later? or....?)
On Wed, Aug 17, 2016 at 10:37 PM, Nicolas Colomer <ino...@gmail.com> wrote:
Hi there,

I was wondering how to achieve such a use case with Kafka Stream: we have a topic containing records that we'd like to process after say 48h delay, so that after this delay we can lookup in whatever database and generate new business facts.

We currently have similar jobs implemented using Samza. We achieved to consume Kafka topics with a delay using a custom MessageChooser that polls the message and accept it or loop while timestamp is contained in a sliding time window.

Is it at least possible?

Thanks for your thoughts and guidances,
Nicolas

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

Nicolas Colomer

unread,
Aug 18, 2016, 11:49:56 AM8/18/16
to Confluent Platform
Thinking a bit more to the problem and looking at the high-level Kafka Streams DSL documentation, I found that the following KStream method might be our solution:
KStream.leftJoin(KStreamValueJoinerJoinWindows) using JoinWindows.after(long timeDifference) (or within(long timeDifference))
Indeed, the thing we try to do is a temporal left join, allowing us to emit message even if no join occurred.

Well actually, this does not solve the problem because we still miss "non-event" detection after the delay.

Thinking back to your answer, you meant I could use a processor to artificially generate my delay. I think a concrete implementation would be to Thread.sleep during [message timestamp - (now - delay)] unless this difference stay positive (in which case we just forward the message downstream). Am I right? This looks coherent to me since, whatever the task craches or is parallelized among partitions, the processor always ensure topic (or partition) head is delayed from now, and if so, process accordingly.

Do you know if there is a way to mix high-level Kafka Stream DSL with low-level API, ie. when chaining transformations and filters, I can add a processor step?
I see there is a KStream.process(ProcessorSupplier<K,V> processorSupplier, java.lang.String... stateStoreNames) method that looks close to what I'm looking for but returns void and not a KStream.

On a different subject, do you know if there is there any hook in the API to plug into and get live metadata/metrics (offsets, lag, backlog, ...) about topics/partition we're consuming? We are used to emit such metrics to our production supervision backend to track any error or delay in the consumption of our topics

Nicolas

Nicolas Colomer

unread,
Aug 19, 2016, 1:03:12 PM8/19/16
to Confluent Platform
Well, with some more additional thoughts, I landed with the following approach using 2 temporal joins (one for the joined events, and the other for the non joined events in a time window):

val displayStream: KStream[String, Display] = ...
val clickStream: KStream[String, Click] = ...

// Emit click events with their original display (if a join occurred in the observation window)
clickStream
.sjoin(displayStream, JoinWindows.of("occurred-30m-before").before(30.minutes.toMillis))
.to(StringSerde, new CustomSerde[(Click, Display)], "clicked")

// Emit non-click events
displayStream
.sleftJoin(clickStream, JoinWindows.of("occurred-30m-after").after(30.minutes.toMillis))
.scollect({ case (id, (display, None)) => id -> display })
.to(StringSerde, new CustomSerde[Display], "not-clicked")

Note: s-prefixed methods are Scala implicit ones that just wrap original KStream API methods to be more Scala-ish. The scollect method is a combination of KStream.filter and KStream.map with a Scala PartialFunction as input.

Does it look viable to you? I'm still a bit concerned about how the stream synchronization is done underhood: does it waits for the topic's head to be delayed enough (using metadata or timestamp extractor) or does is load stuff in-memory / in a local store to do the join?

Anyway, I didn't tried this snippet yet, but plan to do very soon (inspiring myself with the WordCountScalaIntegrationTest).

Cheers,
Nicolas

Nicolas Colomer

unread,
Aug 24, 2016, 11:52:36 AM8/24/16
to Confluent Platform
Hello there,

Some feedback after testing several testing and reading.

It looks the method described in my previous message to detect non-event do not produce the desired output:

// Emit non-click events
displayStream
  .sleftJoin(clickStream, JoinWindows.of("occurred-30m-after").after(30.minutes.toMillis))
  .scollect({ case (id, (display, None)) => id -> display })
  .to(StringSerdenew CustomSerde[Display], "not-clicked")

This stream always output instantly input displays without their clicks, even if clicks was generated after the displays in the time observation window.
I think the JoinWindows.after method do not try to synchronize streams (aka. wait for 30 minutes before producing its output in case no join occurs) and assume instead streams are already somehow synchronized. Am I right?

So I came with another solution from Michael initial answer: I artificially delay (time shift) the display stream, publish delayed messages into an intermediate topic, that I use to leftJoin with the click stream.

Here is how:

val builder = new TopologyBuilder()
  .addSource("source", inputTopic)
  .addProcessor("processor",  supplier, "source")
  .addSink("sink", outputTopic, "processor")

with supplier variable equals to:

val supplier = new ProcessorSupplier[Array[Byte], Array[Byte]] {
  override def get() = new Processor[Array[Byte], Array[Byte]] {
    private var context: ProcessorContext = null
    override def init(context: ProcessorContext) = this.context = context
    override def close() = ()
    override def punctuate(timestamp: Long) = ()
    override def process(key: Array[Byte], value: Array[Byte]) = {
      val ts = context.timestamp()
      val now = System.currentTimeMillis
      val diff = ts - (now - delay.toMillis)
      if (diff > 0) Thread.sleep(diff)
      context.forward(key, value)
      context.commit()
    }
  }
}

This offers the advantage to be fully stateless, but duplicates data into another Kafka topic.

I do all this in a separate topology since there is impedance mismatch between the high-level Kafka Streams DSL and the low-level Processor API (source declaration conflict when creating KStream instances among other things).

This implementation runs well and I made integration tests in pure Kafka Streams High Level API to prove it (see https://github.com/ncolomer/kafka-streams-join) but I have the feeling that I may do things not the right way and that my use case should already been addressed by the API somehow. 

Any opinion would be welcome at this point!

Cheers,
Nicolas

Here is some links I gathered about the specific joining subject:
Kafka Streams tests from Kafka Github project
Joining streams from Confluent 3.0.0 documentation 
Reply all
Reply to author
Forward
0 new messages