Kafka Streams Invalid Timestamp

1,183 views
Skip to first unread message

Atharva Inamdar

unread,
Aug 3, 2016, 9:40:29 AM8/3/16
to Confluent Platform
I have recently upgraded Kafka to 0.10 from 0.9.*. Using protocol 0.10.0.0 my old producers and consumers work fine. 
When Using Kafka Streams, I have a simple DAG. srcStream->flaMap->countByKey->targetStream. The topic I'm consuming from does not have timestamp on messages. If I run streams application without writing back to Kafka topic targetStream (print out countByKey, instead of calling .to() ), everything works. It is able to read messages from srcStream topic without timestamp and process without issues.

As soon as I write back to a Kafka topic targetStream using `.to()` method, I receive an 
Exception in thread "StreamThread-1" java.lang.IllegalArgumentException: Invalid timestamp -1
 at org
.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60)

My Streams application produces a String Key and Long Value.
KTable<String, Long> viewCountBySegment = trackingEvents.flatMap((k, v) -> flatmapEvent(v)).countByKey("ViewCountBySegment");
viewCountBySegment.print(Serdes.String(),Serdes.Long());
viewCountBySegment.toStream().to("analytics_views");

line 3 above is the one causing issues.

How can I resolve this? 

Damian Guy

unread,
Aug 3, 2016, 11:28:31 AM8/3/16
to Confluent Platform
Hi,

The timestamp field was added to ProducerRecord and ConsumerRecord in 0.10.0 - any messages produced by a Kafka 0.9.0 producer will have the timestamp field set to Record.NO_TIMESTAMP, which is -1.

You can work around this by configuring a TimestampExtractor in your StreamsConfig, i.e, 
Properties props = ...
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class)

You can create your own TimestampExtractor if you'd like to extract the time from the value.

HTH,
Damian  

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/1553063d-b891-4279-b775-4dfb3ae089e0%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Atharva Inamdar

unread,
Aug 3, 2016, 11:56:35 AM8/3/16
to Confluent Platform
Thanks, I was aware timestamp, as added in 0.10 but I was confused by the fact that it could read the topic but not produce to another one.

I do have another issue now whereby my streams application doesn't consume anymore. could it be related to having mixed message some with timestamp and some not in same topic?

I get no warning/errors. I've also tried changing the application ID to make it think it's a different application consumer. but still no luck consuming new events

Thanks.


On Wednesday, 3 August 2016 16:28:31 UTC+1, Damian Guy wrote:
Hi,

The timestamp field was added to ProducerRecord and ConsumerRecord in 0.10.0 - any messages produced by a Kafka 0.9.0 producer will have the timestamp field set to Record.NO_TIMESTAMP, which is -1.

You can work around this by configuring a TimestampExtractor in your StreamsConfig, i.e, 
Properties props = ...
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class)

You can create your own TimestampExtractor if you'd like to extract the time from the value.

HTH,
Damian  

Atharva Inamdar

unread,
Aug 3, 2016, 11:57:47 AM8/3/16
to Confluent Platform
I should mention I'm producing events using Confluent Python Kafka client.


On Wednesday, 3 August 2016 16:28:31 UTC+1, Damian Guy wrote:
Hi,

The timestamp field was added to ProducerRecord and ConsumerRecord in 0.10.0 - any messages produced by a Kafka 0.9.0 producer will have the timestamp field set to Record.NO_TIMESTAMP, which is -1.

You can work around this by configuring a TimestampExtractor in your StreamsConfig, i.e, 
Properties props = ...
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class)

You can create your own TimestampExtractor if you'd like to extract the time from the value.

HTH,
Damian  

ha...@ebiw.com

unread,
Sep 7, 2017, 8:26:10 AM9/7/17
to Confluent Platform
hi, 
     This error comes because of kafka version difference. May be your producer or Stream program runs on different kafka version. I will suggest you to work with kafka v 11 for all programs

Matthias J. Sax

unread,
Sep 7, 2017, 12:35:28 PM9/7/17
to confluent...@googlegroups.com
That's not necessarily true.

Client as of 0.10.0 are backwards compatible to older brokers. For
Streams API, backwards compatibility starts with 0.10.1.

See
http://docs.confluent.io/current/streams/upgrade-guide.html#compatibility


-Matthias

On 9/7/17 5:26 AM, ha...@ebiw.com wrote:
> hi, 
>      This error comes because of kafka version difference. May be your
> producer or Stream program runs on different kafka version. I will
> suggest you to work with kafka v 11 for all programs
>  
> On Wednesday, August 3, 2016 at 7:10:29 PM UTC+5:30, Atharva Inamdar wrote:
>
> I have recently upgraded Kafka to 0.10 from 0.9.*. Using protocol
> 0.10.0.0 my old producers and consumers work fine. 
> When Using Kafka Streams, I have a simple DAG.
> srcStream->flaMap->countByKey->targetStream. The topic I'm consuming
> from does not have timestamp on messages. If I run streams
> application without writing back to Kafka topic targetStream (print
> out countByKey, instead of calling .to() ), everything works. It is
> able to read messages from srcStream topic without timestamp and
> process without issues.
>
> As soon as I write back to a Kafka topic targetStream using `.to()`
> method, I receive an 
> |
> Exceptioninthread
> "StreamThread-1"java.lang.IllegalArgumentException:Invalidtimestamp -1
>  at
> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60)
> |
>
> My Streams application produces a String Key and Long Value.
>
> KTable<String, Long> viewCountBySegment = trackingEvents.flatMap((k, v) -> flatmapEvent(v)).countByKey("ViewCountBySegment");
> viewCountBySegment.print(Serdes.String(),Serdes.Long());
> viewCountBySegment.toStream().to("analytics_views");
>
>
> line 3 above is the one causing issues.
>
> How can I resolve this? 
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/bf76d6eb-49d1-4e45-8256-f1b6f8126daa%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/bf76d6eb-49d1-4e45-8256-f1b6f8126daa%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc

Harsh Manyase

unread,
Sep 7, 2017, 12:44:51 PM9/7/17
to confluent...@googlegroups.com
Ya but. Your producer and processing program need to b on same Kafka version.And Kafka modified some method from version 9 so . That's what I  solve my issue. I shared with you.

Regards,
Harsh manyase
EBIW Info Analytics Pvt. Ltd.


From: confluent...@googlegroups.com <confluent...@googlegroups.com> on behalf of Matthias J. Sax <matt...@confluent.io>
Sent: Thursday, September 7, 2017 10:05:21 PM
To: confluent...@googlegroups.com
Subject: Re: Kafka Streams Invalid Timestamp
 
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/5oT0GRztPBo/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/8a93cedf-5896-1fe0-49c0-10a8f0b201b8%40confluent.io.
Reply all
Reply to author
Forward
0 new messages