Kafka Streams - use timestamp

711 views
Skip to first unread message

Dan Kinsley

unread,
Jan 18, 2017, 9:44:54 AM1/18/17
to Confluent Platform
I see that streams require a "timestamp extractor" for stateful processing such as windows, joins, etc. However, is it possible to retrieve the timestamp from a "map" function (or maybe the deserializer)? My source data does not have a timestamp so I am trying to use the event time to augment the output data. 

Michael Noll

unread,
Jan 18, 2017, 9:53:11 AM1/18/17
to confluent...@googlegroups.com
Dan,

for clarification:

My source data does not have a timestamp so I am trying to use the event time to augment the output data. 

If your source data does not have any timestamp, how would you be able to augment the output data with event-time, which (by definition, almost) is based on information in the source data itself?  Do you mean processing-time, i.e. the time when your source data happens to being processed by a Streams application?

-Michael





On Wed, Jan 18, 2017 at 3:44 PM, Dan Kinsley <d...@mgemi.com> wrote:
I see that streams require a "timestamp extractor" for stateful processing such as windows, joins, etc. However, is it possible to retrieve the timestamp from a "map" function (or maybe the deserializer)? My source data does not have a timestamp so I am trying to use the event time to augment the output data. 

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/5aaa16bf-bee3-4f85-9f2c-0d7ccabe547e%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



Dan Kinsley

unread,
Jan 18, 2017, 11:15:03 AM1/18/17
to Confluent Platform
Thanks for the response Michael. I was under the impression that each record has a timestamp of when the record was produced. However, the "event data" itself has no such timestamp. I could explicitly add a timestamp to the event data when it is produced but this seems redundant if it is also being stored in the record ( ConsumerRecord.timestamp() ). 

When I map over events, I get just the "key" and "data", but not the timestamp extracted via ConsumerRecordTimestampExtractor. Is there any way to access this along with the event key / data?

Forgive me if I am missing something obvious, still new to Kafka :-)

On Wednesday, January 18, 2017 at 9:53:11 AM UTC-5, Michael Noll wrote:
Dan,

for clarification:

My source data does not have a timestamp so I am trying to use the event time to augment the output data. 

If your source data does not have any timestamp, how would you be able to augment the output data with event-time, which (by definition, almost) is based on information in the source data itself?  Do you mean processing-time, i.e. the time when your source data happens to being processed by a Streams application?

-Michael




On Wed, Jan 18, 2017 at 3:44 PM, Dan Kinsley <d...@mgemi.com> wrote:
I see that streams require a "timestamp extractor" for stateful processing such as windows, joins, etc. However, is it possible to retrieve the timestamp from a "map" function (or maybe the deserializer)? My source data does not have a timestamp so I am trying to use the event time to augment the output data. 

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

Matthias J. Sax

unread,
Jan 18, 2017, 12:50:22 PM1/18/17
to confluent...@googlegroups.com
Right now, you cannot access a records timestamp in map(). However, you
can use transform() instead of map() -- it provide a context object via
init() method that is a updated under the hood for each incoming record
and allows to get the current records timestamp.

-Matthias
> confluent-platf...@googlegroups.com <javascript:>.
> To post to this group, send email to
> confluent...@googlegroups.com <javascript:>.
> <https://groups.google.com/d/msgid/confluent-platform/5aaa16bf-bee3-4f85-9f2c-0d7ccabe547e%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout
> <https://groups.google.com/d/optout>.
>
>
>
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/299915de-be89-4e2c-8a1c-3b16270f03e6%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/299915de-be89-4e2c-8a1c-3b16270f03e6%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc
Reply all
Reply to author
Forward
0 new messages