--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/924ea89c-b0a3-4c42-b299-d89a1489c955%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Nicolas,yes, this is possible using the Processor API. See my answer on Stack Overflow:Alternatively, you could also consider starting/stopping your Kafka Streams applications to achieve 48h intervals.That said, perhaps there's a better approach to your problem but this might depend on e.g. why exactly you need to process after a 48h delay (is that because the lookup data arrives two days later? or....?)
On Wed, Aug 17, 2016 at 10:37 PM, Nicolas Colomer <ino...@gmail.com> wrote:
Hi there,I was wondering how to achieve such a use case with Kafka Stream: we have a topic containing records that we'd like to process after say 48h delay, so that after this delay we can lookup in whatever database and generate new business facts.We currently have similar jobs implemented using Samza. We achieved to consume Kafka topics with a delay using a custom MessageChooser that polls the message and accept it or loop while timestamp is contained in a sliding time window.Is it at least possible?Thanks for your thoughts and guidances,Nicolas
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/924ea89c-b0a3-4c42-b299-d89a1489c955%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Thinking a bit more to the problem and looking at the high-level Kafka Streams DSL documentation, I found that the following KStream method might be our solution:
KStream.leftJoin(KStream, ValueJoiner, JoinWindows) using JoinWindows.after(long timeDifference) (or within(long timeDifference))Indeed, the thing we try to do is a temporal left join, allowing us to emit message even if no join occurred.
val displayStream: KStream[String, Display] = ...
val clickStream: KStream[String, Click] = ...
// Emit click events with their original display (if a join occurred in the observation window)
clickStream
.sjoin(displayStream, JoinWindows.of("occurred-30m-before").before(30.minutes.toMillis))
.to(StringSerde, new CustomSerde[(Click, Display)], "clicked")
// Emit non-click events
displayStream
.sleftJoin(clickStream, JoinWindows.of("occurred-30m-after").after(30.minutes.toMillis))
.scollect({ case (id, (display, None)) => id -> display })
.to(StringSerde, new CustomSerde[Display], "not-clicked")
// Emit non-click eventsdisplayStream.sleftJoin(clickStream, JoinWindows.of("occurred-30m-after").after(30.minutes.toMillis)).scollect({ case (id, (display, None)) => id -> display }).to(StringSerde, new CustomSerde[Display], "not-clicked")
val builder = new TopologyBuilder().addSource("source", inputTopic)
.addProcessor("processor", supplier, "source").addSink("sink", outputTopic, "processor")
val supplier = new ProcessorSupplier[Array[Byte], Array[Byte]] {override def get() = new Processor[Array[Byte], Array[Byte]] {private var context: ProcessorContext = nulloverride def init(context: ProcessorContext) = this.context = contextoverride def close() = ()
override def punctuate(timestamp: Long) = ()
override def process(key: Array[Byte], value: Array[Byte]) = {
val ts = context.timestamp()val now = System.currentTimeMillisval diff = ts - (now - delay.toMillis)if (diff > 0) Thread.sleep(diff)context.forward(key, value)
context.commit()
}}}