Hi Team,
We are using kafka connect elasticsearch sink connector. From a kafka topic having avro data, we are dumping it into an elasticsearch index with the sink connector. With minimal load like manual pushing of few records, the connector works fine and index is also updated.
However during bulk loads, we are getting multiple exceptions. Tried updating the configuration but it still breaks at some point of time.
Our elasticsearch sink connector configuration is as below :
tasks.max: 5,
batch.size: 500,
key.ignore: true,
max.buffered.records: 2000,
schema.ignore: true,
And connector worker properties have been updated as below :
consumer.session.timeout.ms=300000
consumer.max.poll.interval.ms=300000
consumer.heartbeat.interval.ms=60000
consumer.max.poll.records=2000
consumer.request.timeout.ms=360000
While starting the connector, it works fine till a certain point and then we get multiple exceptions :
2017-06-01 06:26:04 ERROR WorkerSinkTask:180 - Commit of WorkerSinkTask{id=elasticsearch-sink-connector-1-3} offsets threw an unexpected exception:
org.apache.kafka.connect.errors.ConnectException: Flush timeout expired with unflushed records: 2706
at io.confluent.connect.elasticsearch.bulk.BulkProcessor.flush(BulkProcessor.java:302)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:217)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:125)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:275)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:155)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
After a certain point, we start getting socket time out exceptions.
2017-06-01 09:53:15 ERROR WorkerSinkTask:390 - Task elasticsearch-sink-connector-1-3 threw an uncaught and unrecoverable exception
org.apache.kafka.connect.errors.ConnectException: java.net.SocketTimeoutException: Read timed out
at io.confluent.connect.elasticsearch.bulk.BulkProcessor.toConnectException(BulkProcessor.java:390)
at io.confluent.connect.elasticsearch.bulk.BulkProcessor.failAndStop(BulkProcessor.java:375)
at io.confluent.connect.elasticsearch.bulk.BulkProcessor.access$100(BulkProcessor.java:41)
at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:328)
at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:312)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Any help is appreciated.
Thanks,
Deepti