Multiple exceptions while running kafka-connect-elasticsearch - bulk load

1,101 views
Skip to first unread message

Deepti Antony

unread,
Jun 1, 2017, 7:19:57 AM6/1/17
to Confluent Platform
Hi Team,

We are using kafka connect elasticsearch sink connector. From a kafka topic having avro data, we are dumping it into an elasticsearch index with the sink connector. With minimal load like manual pushing of few records, the connector works fine and index is also updated.

However during bulk loads, we are getting multiple exceptions. Tried updating the configuration but it still breaks at some point of time. 

Our elasticsearch sink connector configuration is as below : 
  type.name: kafka-connect,
  tasks.max: 5,
  batch.size: 500,
  key.ignore: true,
  retry.backoff.ms: 10000,
  max.buffered.records: 2000,
  schema.ignore: true,
  flush.timeout.ms: 90000,
  linger.ms: 60




And connector worker properties have been updated as below :
consumer.session.timeout.ms=300000
consumer
.max.poll.interval.ms=300000
consumer
.heartbeat.interval.ms=60000
consumer
.max.poll.records=2000
consumer
.request.timeout.ms=360000


While starting the connector, it works fine till a certain point and then we get multiple exceptions : 

2017-06-01 06:26:04 ERROR WorkerSinkTask:180 - Commit of WorkerSinkTask{id=elasticsearch-sink-connector-1-3} offsets threw an unexpected exception:
org
.apache.kafka.connect.errors.ConnectException: Flush timeout expired with unflushed records: 2706
        at io
.confluent.connect.elasticsearch.bulk.BulkProcessor.flush(BulkProcessor.java:302)
        at io
.confluent.connect.elasticsearch.ElasticsearchWriter.flush(ElasticsearchWriter.java:217)
        at io
.confluent.connect.elasticsearch.ElasticsearchSinkTask.flush(ElasticsearchSinkTask.java:125)
        at org
.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:275)
        at org
.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:155)
        at org
.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
        at org
.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
        at org
.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
        at java
.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java
.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java
.lang.Thread.run(Thread.java:745)


After a certain point, we start getting socket time out exceptions. 

2017-06-01 09:53:15 ERROR WorkerSinkTask:390 - Task elasticsearch-sink-connector-1-3 threw an uncaught and unrecoverable exception
org
.apache.kafka.connect.errors.ConnectException: java.net.SocketTimeoutException: Read timed out
        at io
.confluent.connect.elasticsearch.bulk.BulkProcessor.toConnectException(BulkProcessor.java:390)
        at io
.confluent.connect.elasticsearch.bulk.BulkProcessor.failAndStop(BulkProcessor.java:375)
        at io
.confluent.connect.elasticsearch.bulk.BulkProcessor.access$100(BulkProcessor.java:41)
        at io
.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:328)
        at io
.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:312)
        at java
.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java
.lang.Thread.run(Thread.java:745)


Any help is appreciated.

Thanks,
Deepti 

Deepti Antony

unread,
Jun 8, 2017, 1:23:31 AM6/8/17
to Confluent Platform
Hello,

So I just took a look at connector code and saw it uses JEST API for Elasticsearch writes. I added readtimeout of 60secs (default is 3 secs) with which currently SocketTimeOutException is not happening. But we are still getting Flush timeout issues. Does anyone know how to resolve that? It would be really helpful.

Thanks in advance. 

Regards,
Deepti

迈克尔丹

unread,
Apr 23, 2019, 5:09:09 AM4/23/19
to Confluent Platform
Can you tell the details about solve this problem?

在 2017年6月8日星期四 UTC+8下午1:23:31,Deepti Antony写道:

迈克尔丹

unread,
Apr 23, 2019, 5:15:16 AM4/23/19
to Confluent Platform


在 2017年6月1日星期四 UTC+8下午7:19:57,Deepti Antony写道:
Reply all
Reply to author
Forward
0 new messages