ERROR Commit of WorkerSinkTask id + offsets threw an unexpected exception

1,578 views
Skip to first unread message

Raju Divakaran

unread,
Jul 11, 2016, 8:44:53 AM7/11/16
to Confluent Platform

Hello,

Can anyone please help me with this error, which is showing up very frequently and I feel this ends up in a situation that kafka connect is just running but not doing its purpose ( from kafka to HDFS ).

[2016-07-11 09:25:10,319] ERROR Error closing writer for topicname. Error: {] (io.confluent.connect.hdfs.DataWriter:321)
[2016-07-11 09:25:11,320] INFO WorkerSinkTask{id=data_platform_high_traffic-8} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:244)
[2016-07-11 09:25:11,327] ERROR Commit of WorkerSinkTask{id=data_platform_high_traffic-8} offsets threw an unexpected exception: (org.apache.kafka.connect.runtime.WorkerSinkTask:180)
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
at 

org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:998)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:305)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Before this error I had my session time out at default value which is 30000ms and later I changed it to 60000ms. Along with increasing session timeout I bumped up request.timeout as well.

Thank you!

Dustin Cote

unread,
Jul 11, 2016, 8:58:12 AM7/11/16
to confluent...@googlegroups.com
Hi Raju,

It looks like the key part of the error message that gives the explanation of what is happening is here:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

I assume you are using the HDFS Connector that ships with CP.  Are you seeing any data making it through to HDFS at all?  The above error implies that the consumer is spending too much time in the poll loop processing data and you may need smaller batches or longer timeouts to continue processing data in your current setup. 

Cheers,

Dustin

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/62349360-7cfc-4f8e-ae0a-e5c0c668e871%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Dustin Cote

Raju Divakaran

unread,
Jul 11, 2016, 9:20:28 AM7/11/16
to Confluent Platform
Hello Dustin,

Thanks for the reply.

Yes I can see data is been written to HDFS, but at a very slow pace. In the sense there is big delay between data in Kafka that is yet to be written. 

I increased the session timeout value to 120 seconds and request.timeout to 130 seconds. Please see my worker.properties and it would be nice if you could suggest the modifications that you would do

$ cat /etc/kafka_connect/worker.properties
# Kafka Connect Worker Properties 

group.id=production_kc_cluster
status.storage.topic=_kafka_connect_status
config.storage.topic=_kafka_connect_storage
offset.storage.topic=_kafka_connect_offset
key.converter=io.confluent.connect.avro.NullKeyConverter
value.converter=io.confluent.connect.avro.AvroConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
rest.port=8083
receive.buffer.bytes=32768
security.protocol=PLAINTEXT
send.buffer.bytes=131072
client.id=data_platform_production
metrics.sum.samples=2

Thanks,
Raju.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.



--
Dustin Cote

Dustin Cote

unread,
Jul 13, 2016, 10:45:49 AM7/13/16
to confluent...@googlegroups.com
Hi Raju,

It seems like you may want to do some consumer tuning in this case if the poll lasting too long.  There's some documentation here that shows how to override consumer configs within the worker:


You probably want to add consumer.session.timeout.ms with the settings you have mentioned already or make smaller batches with consumer.max.poll.records to avoid this error.  You already have session.timeout.ms set which should be taking effect, so you may just need to increase this further in your environment.  Reducing the number of records per poll might make more sense in your case for optimizing performance if your HDFS performance is a little bit unpredictable.  I don't see anything that makes me think you have a misconfiguration causing the slow performance, so it's likely more of an environment issue in that case.  Hope that helps.

-Dustin

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.



--
Dustin Cote

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Dustin Cote

Raju Divakaran

unread,
Jul 15, 2016, 7:03:22 AM7/15/16
to Confluent Platform
Thank you Dustin. I set the poll.records value to a lower number and the same error doesn't come again. If you don't mind would like to check if you can brief on me few bahaviours of kafka connect when it shows as below in the log files

[2016-07-15 02:17:08,296] INFO    Marking the coordinator x.x.x.x:9092 (id: 2147483636 rack: null) dead for group production_kc_cluster (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:542)
[2016-07-15 02:17:08,566] INFO    Discovered coordinator x.x.x.x:9092 (id: 2147483636 rack: null) for group production_kc_cluster. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:505)
[2016-07-15 02:17:09,907] INFO    Marking the coordinator x.x.x.x:9092 (id: 2147483636 rack: null) dead for group production_kc_cluster (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:542)
[2016-07-15 02:17:10,140] INFO    Discovered coordinator x.x.x.x:9092 (id: 2147483636 rack: null) for group production_kc_cluster. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:505)
[2016-07-15 02:17:18,801] INFO    Marking the coordinator x.x.x.x:9092 (id: 2147483636 rack: null) dead for group production_kc_cluster (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:542)
[2016-07-15 02:17:18,801] INFO    Discovered coordinator x.x.x.x:9092 (id: 2147483636 rack: null) for group production_kc_cluster. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:505)
[2016-07-15 02:17:18,802] INFO    Marking the coordinator x.x.x.x:9092 (id: 2147483636 rack: null) dead for group production_kc_cluster (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:542)
[2016-07-15 02:17:18,902] INFO    Discovered coordinator x.x.x.x:9092 (id: 2147483636 rack: null) for group production_kc_cluster. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:505)


I just would like to know, what does it mean when it says marking a specific coordinator as dead.

I can see soon after having this for say half an hour.. then kafka connect is always waiting for connectors


[2016-07-15 10:48:46,781] INFO    x.x.x.x - - [15/Jul/2016:10:48:46 +0000] "GET /connectors HTTP/1.1" 200 502  3 (org.apache.kafka.connect.runtime.rest.RestServer:60)
[2016-07-15 10:48:46,787] INFO    x.x.x.x - - [15/Jul/2016:10:48:46 +0000] "GET /connectors HTTP/1.1" 200 502  1 (org.apache.kafka.connect.runtime.rest.RestServer:60)
[2016-07-15 10:48:46,808] INFO    x.x.x.x - - [15/Jul/2016:10:48:46 +0000] "GET /connectors HTTP/1.1" 200 502  1 (org.apache.kafka.connect.runtime.rest.RestServer:60)

And it doesn't seem to be doing anything.

Thanks very much!

Raju.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.



--
Dustin Cote

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.



--
Dustin Cote

Dustin Cote

unread,
Jul 19, 2016, 10:10:41 AM7/19/16
to confluent...@googlegroups.com
Hi Raju,

This looks like a general consumer/client issue rather than a specific connect issue.  That said, this can happen when the consumer loses it's session because a heartbeat hasn't been sent from the consumer to the broker to keep the session alive.  I would expect this is again related to the poll taking too long in your case as you were having trouble with this before.  It's more or less a similar problem manifesting as a different symptom.  You'll note that the coordinator is dead for only ~100ms before getting discovered again, so you probably are just slightly too long in your poll call, so maybe decreasing poll.records slightly more could help avoid this.

Regards,


To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.



--
Dustin Cote

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.



--
Dustin Cote

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Dustin Cote
Reply all
Reply to author
Forward
0 new messages