I found this exception when the "consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" was configured . Below is the segment of the log:
ERROR Monitoring Interceptor failed to record message metrics for topic: upload_flow, partition: 4 (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor:146)
java.lang.IllegalArgumentException: Invalid timestamp -1
at io.confluent.monitoring.clients.interceptor.TimeBuckets.get(TimeBuckets.java:65)
at io.confluent.monitoring.clients.interceptor.MonitoringTimeBuckets.record(MonitoringTimeBuckets.java:61)
at io.confluent.monitoring.clients.interceptor.MonitoringInterceptor.recordMessageMetric(MonitoringInterceptor.java:139)
at io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor.onConsume(MonitoringConsumerInterceptor.java:48)
at org.apache.kafka.clients.consumer.internals.ConsumerInterceptors.onConsume(ConsumerInterceptors.java:57)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:952)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:305)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
In fact I used schema server before transfer records into kafka, Below is the schema for topic upload_flow :
{
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "timestamp",
"type": "long"
},
{
"name": "peer_id",
"type": "string"
},
{
"name": "upload",
"type": "long"
},
{
"default": null,
"name": "duration",
"type": [
"null",
"long"
]
},
{
"name": "input_time",
"type": "long"
},
{
"name": "output_time",
"type": "long"
},
{
"default": null,
"name": "play_type",
"type": [
"null",
"string"
]
},
{
"name": "public_ip",
"type": "string"
}
],
"name": "upload_flow",
"type": "record"
}
And This is a record in kafka:
{"id":"10.103.0.4:770052:987742508","timestamp":1472182628479,"peer_id":"0001000714724E5191DC23AD8065FCD8","upload":0,"duration":{"long":60},"input_time":1472182628479,"output_time":1472182628614,"play_type":null,"public_ip":"*.*.*.*"}
Because the type of timestamp is limited with long I cannot image the the Monitor report this java.lang.IllegalArgumentException .
Anyone can give some tips?
Regards