Preserving timestamps across a Kafka Streams processor

278 views
Skip to first unread message

Jeff Faust

unread,
Feb 8, 2017, 5:14:18 PM2/8/17
to Confluent Platform

I've got a stream processor topology that does enrichment on a stream of metrics and looks something like this:


input_topic -> mapValues -> leftJoin(otherTable) -> map -> to -> output_topic


I’d like to preserve the input timestamps across these transforms. I’d like the output messages to have the same timestamp as their corresponding input messages. Instead, the output messages get timestamped with their creation time, the current time. I have downstream processors that pick up the output topic messages and do time windowed aggregations. They need the original event time for the results to make sense.


I could embed the timestamp in the message body but that would end up propagating through all of my types. It would break the TimeWindows() based aggregations and make aggregating more awkward. It also seems redundant since every message has this other “official” timestamp traveling with it.


The low level processor API supports preserving timestamps across input and output for 1:1 transforms so I assumed the Kafka Streams DSL would too. It seems like this would be a standard pattern but I can’t find anything about how to do it.

Matthias J. Sax

unread,
Feb 9, 2017, 1:03:21 AM2/9/17
to confluent...@googlegroups.com
Hi,

Kafka Streams uses current "stream time" when writing records to topics
(for both DSL and Processor API -- aka PAPI)

Thus, your statement is not correct:

>> The low level processor API supports preserving timestamps across input
>> and output for 1:1 transforms


I also do not understand what you mean by

> Instead, the output messages get
>> timestamped with their creation time, the current time.

Can it be, that your output topic is configure with
log.message.timestamp.type = LogAppendTime

and not "CreateTime" ? LogAppendTime configuration tells the broker to
overwrite the embedded record timestamp with current broker system time
when the record is appended to the topic.

For details about this see
http://docs.confluent.io/current/streams/concepts.html#time


Nevertheless, "stream time", that is used for result records, is an
internally tracked time (based on whatever TimestampExtractor returns),
that represent the "progress" of your application. How "stream time"
gets advanced is a little complex and not relevant for this discussion.
The point is, that "stream time" is not the same as the current record's
timestamp (if it is, than only by coincidence).

So you will need to embed the timestamp in you record body to preserve
it. Furthermore, you need to provide a custom TimestampExtractor to base
further step on the "new" timestamp.

> It would break the TimeWindows()
>> based aggregations and make aggregating more awkward.

Using a custom TimestampExtractor should prevent that TimeWindows() base
aggregation breaks...

Hope this helps. Let us know, if you have further questions.


-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/92a3d07a-fd57-492e-826e-380f600db323%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/92a3d07a-fd57-492e-826e-380f600db323%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc
Reply all
Reply to author
Forward
0 new messages