Timestamps preventing Control Center from monitoring custom consumer

542 views
Skip to first unread message

Darrel Riekhof

unread,
Feb 7, 2018, 5:31:01 PM2/7/18
to Confluent Platform
I have a Java consumer that I've configured with confluent consumer interceptor.   From consumer's log:

2018-02-07 21:39:52,414 [MonitoringConsumerInterceptor.java:Thread-1:70] - creating interceptor
2018-02-07 21:39:52,524 [     AbstractConfig.java:Thread-1:223] - MonitoringInterceptorConfig values:
 confluent
.monitoring.interceptor.publishMs = 15000
 confluent
.monitoring.interceptor.topic = _confluent-monitoring



The producer is a Flume Kafka sink.  Flume gets if from another process that's generating events.  I'm not trying to monitor the flume to kafka sink producer at the moment, just the consumer reading messages out of the topic.

I've also added this in the etc/kafka/server.properties file:

## trying to get consumer metrics working in control center
log
.message.format.version=1.0


But, I am getting these errors in my consumer log:


2018-02-07 21:39:52,668 [      FooParser.java:Thread-1:109] - Kafka version : 1.0.0
2018-02-07 21:39:52,670 [MonitoringInterceptor.java:Thread-1:153] - interceptor=confluent.monitoring.interceptor.-1 created for client_id=-1 client_type=CONSUMER session= cluster=xxx group=FooConsumer

2018-02-07 21:39:52,676 [MonitoringInterceptor.java:Thread-1:259] - Monitoring Interceptor skipped 1 messages with missing or invalid timestamps for topic foo. The messages were either corrupted or using an older message format. Please verify that all your producers support timestamped messages and that your brokers and topics are all configured with log.message.format.version, and message.format.version >= 0.10.0 respectively. You may also experience this if you are consuming older messages produced to Kafka prior to any of those changes taking place.
2018-02-07 21:39:54,369 [MonitoringInterceptor.java:Thread-1:259] - Monitoring Interceptor skipped 60 messages with missing or invalid timestamps for topic foo. The messages were either corrupted or using an older message format. Please verify that all your producers support timestamped messages and that your brokers and topics are all configured with log.message.format.version, and message.format.version >= 0.10.0 respectively. You may also experience this if you are consuming older messages produced to Kafka prior to any of those changes taking place.


I've been scouring the internet, but I haven't found anything to give me some hints on what to do to fix this.  What kind of timestamp is it expecting, and how would I add it, or configure my flume kafka sink to add it?

Any advice appreciated.




Darrel Riekhof

unread,
Feb 8, 2018, 12:06:23 PM2/8/18
to Confluent Platform
I've found that adding this to etc/kafka/server.properties makes those errors go away and the consumer works again:

log.message.format.version=1.0
log
.message.timestamp.type=LogAppendTime

My shallow understanding is that now it uses the timestamp of when message gets written to topic instead of when producer sent it.

This is fine for our use case.  But, I'd still be interested in learning how I could use the flume sink client's timestamp or the process that's creating the events timestamp as the 'official' kafka record's timestamp.

dan

unread,
Feb 8, 2018, 4:27:22 PM2/8/18
to confluent...@googlegroups.com
do you happen to know what version of the kafka clients flume is using?

dan

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/e528b332-7b7e-46d5-9431-2613b85f93a5%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Darrel Riekhof

unread,
Feb 8, 2018, 4:53:30 PM2/8/18
to Confluent Platform
I'm guessing this means the flume kafka sink client is using 0.9.

[root@myhost cloudera]# find . -name "*kafka*"
...
./parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/flume-ng/lib/flume-kafka-source-1.6.0-cdh5.12.1.jar
./parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/flume-ng/lib/flume-ng-kafka-sink-1.6.0-cdh5.12.1.jar
./parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/flume-ng/lib/kafka-clients-0.9.0-kafka-2.0.2.jar
./parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/flume-ng/lib/flume-kafka-source.jar
./parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/flume-ng/lib/flume-kafka-channel.jar
./parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/flume-ng/lib/flume-ng-kafka-sink.jar
./parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/flume-ng/lib/kafka_2.10-0.9.0-kafka-2.0.2.jar
./parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/flume-ng/lib/flume-kafka-channel-1.6.0-cdh5.12.1.jar

dan

unread,
Feb 8, 2018, 5:17:24 PM2/8/18
to confluent...@googlegroups.com
interesting. timestamps weren't around till 0.10, so if you are using a producer from a previous version you need LogAppendTime since the default of CreateTime is sent in 0.10+ producers. if you do this you likely want to only change this setting for the topics being produced to by the 0.9 clients unless you are ok with LogAppendTime for all your topics.

one other issue is you will not really get any useful info by instrumenting these consumers with the confluent interceptors since we rely on comparing the consumer/producer timestamps to do our verification.

dan

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

Darrel Riekhof

unread,
Feb 8, 2018, 7:06:02 PM2/8/18
to Confluent Platform
I am getting a ton of really annoying latency messages on my consumer console:

2018-02-08 23:27:09,375 [  MonitoringMetrics.java:Thread-1:62] - Negative message latency=-3 ms
2018-02-08 23:27:09,375 [  MonitoringMetrics.java:Thread-1:62] - Negative message latency=-5 ms

I assume because the client and server clocks are off by a few ticks.

I thought I was getting some useful info out of the consumer interceptor because it was at least showing us how many messages it was consuming in the Control Center Data Streams UI.

dan

unread,
Feb 12, 2018, 11:29:38 AM2/12/18
to confluent...@googlegroups.com
are you able to set up ntp to get the clocks in sync? https://docs.confluent.io/current/control-center/docs/concept.html#latency

if you don't mind the clocks being off, you can modify the log4j config to not print those messages.

dan

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

Darrel Riekhof

unread,
Feb 12, 2018, 12:58:10 PM2/12/18
to Confluent Platform
I modified the log4j like you suggested, thanks.  We have ntp sync'ing but not good enough I guess.
Reply all
Reply to author
Forward
0 new messages