Kafka streams ingestion-time

412 views
Skip to first unread message

Shannon Ma

unread,
Feb 9, 2017, 12:07:33 PM2/9/17
to Confluent Platform
Hi,

Is there a way to get the ingestion time of a message during stream processing?

Thanks
Shannon

Matthias J. Sax

unread,
Feb 9, 2017, 2:00:00 PM2/9/17
to confluent...@googlegroups.com
If you want to get broker ingestion time, you need to reconfigure the
topic on the broker side.

log.message.timestamp.type must be set to LogAppendTime (default is
CreateTime)

cf. http://kafka.apache.org/documentation/#brokerconfigs

This tells the broker, to overwrite the record's metadata timestamp with
current broker system time when a record gets appended to the log.

Thus, on Streams side, you can just use the default TimestampExtractor.

cf. http://docs.confluent.io/current/streams/concepts.html#time


-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/6d6fb310-9a9e-4b13-9db6-b187bd62ff36%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/6d6fb310-9a9e-4b13-9db6-b187bd62ff36%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc

Shannon Ma

unread,
Feb 9, 2017, 2:29:50 PM2/9/17
to Confluent Platform
Maybe i use the wrong name, I have my own TimestampExtractor for event time, but i also like to know the processing(?) time.

Matthias J. Sax

unread,
Feb 9, 2017, 2:42:43 PM2/9/17
to confluent...@googlegroups.com
Depends:

If you want to base your application logic (windows etc) on processing
time, you can just use WallClockTimestampExtractor.

If you just want to augment you data with processing time information
you can call System.currentTimeMillis() within you custom
TimestampExtractor and add this value to your record.


-Matthias


On 2/9/17 11:29 AM, Shannon Ma wrote:
> Maybe i use the wrong name, I have my own TimestampExtractor for event
> time, but i also like to know the processing(?) time.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/b2d59922-bc33-4cde-abad-3d7dcbf6fcb4%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/b2d59922-bc33-4cde-abad-3d7dcbf6fcb4%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc

Eno Thereska

unread,
Feb 10, 2017, 9:55:45 AM2/10/17
to Confluent Platform
You could just look up the current wallclock time in your processing, right?

Eno


On Thursday, 9 February 2017 19:42:43 UTC, Matthias J. Sax wrote:
Depends:

If you want to base your application logic (windows etc) on processing
time, you can just use WallClockTimestampExtractor.

If you just want to augment you data with processing time information
you can call System.currentTimeMillis() within you custom
TimestampExtractor and add this value to your record.


-Matthias


On 2/9/17 11:29 AM, Shannon Ma wrote:
> Maybe i use the wrong name, I have my own TimestampExtractor for event
> time, but i also like to know the processing(?) time.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send

Shannon Ma

unread,
Feb 10, 2017, 10:28:21 AM2/10/17
to Confluent Platform
My use case is that in my first Kafka Streams, i build a state store, then in my second Kafka Streams i fetch from the store, the same messages go through these two streams, so when in second logic (current message txn1), there might be a newer txn (txn2) in the store already (by first streams), in this case i need to know and ignore txn2 in processing txn1, i cannot use txn date here. So if not available, i need to add timestamp to the txn in first logic.

Thanks
Shannon

Matthias J. Sax

unread,
Feb 10, 2017, 5:02:49 PM2/10/17
to confluent...@googlegroups.com
I guess you will need to add the timestamp to the records.

-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/8fe08cd2-c770-414a-a2ff-17a5f9198145%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/8fe08cd2-c770-414a-a2ff-17a5f9198145%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc
Reply all
Reply to author
Forward
0 new messages