I have a Java consumer that I've configured with confluent consumer interceptor. From consumer's log:
2018-02-07 21:39:52,414 [MonitoringConsumerInterceptor.java:Thread-1:70] - creating interceptor
2018-02-07 21:39:52,524 [ AbstractConfig.java:Thread-1:223] - MonitoringInterceptorConfig values:
confluent.monitoring.interceptor.publishMs = 15000
confluent.monitoring.interceptor.topic = _confluent-monitoring
The producer is a Flume Kafka sink. Flume gets if from another process that's generating events. I'm not trying to monitor the flume to kafka sink producer at the moment, just the consumer reading messages out of the topic.
I've also added this in the etc/kafka/server.properties file:
## trying to get consumer metrics working in control center
log.message.format.version=1.0
But, I am getting these errors in my consumer log:
2018-02-07 21:39:52,668 [ FooParser.java:Thread-1:109] - Kafka version : 1.0.0
2018-02-07 21:39:52,670 [MonitoringInterceptor.java:Thread-1:153] - interceptor=confluent.monitoring.interceptor.-1 created for client_id=-1 client_type=CONSUMER session= cluster=xxx group=FooConsumer
2018-02-07 21:39:52,676 [MonitoringInterceptor.java:Thread-1:259] - Monitoring Interceptor skipped 1 messages with missing or invalid timestamps for topic foo. The messages were either corrupted or using an older message format. Please verify that all your producers support timestamped messages and that your brokers and topics are all configured with log.message.format.version, and message.format.version >= 0.10.0 respectively. You may also experience this if you are consuming older messages produced to Kafka prior to any of those changes taking place.
2018-02-07 21:39:54,369 [MonitoringInterceptor.java:Thread-1:259] - Monitoring Interceptor skipped 60 messages with missing or invalid timestamps for topic foo. The messages were either corrupted or using an older message format. Please verify that all your producers support timestamped messages and that your brokers and topics are all configured with log.message.format.version, and message.format.version >= 0.10.0 respectively. You may also experience this if you are consuming older messages produced to Kafka prior to any of those changes taking place.
I've been scouring the internet, but I haven't found anything to give me some hints on what to do to fix this. What kind of timestamp is it expecting, and how would I add it, or configure my flume kafka sink to add it?
Any advice appreciated.