Kafka Connect Wake up Exception while rebalancing

1,888 views
Skip to first unread message

dmaity

unread,
Jul 26, 2017, 9:55:10 PM7/26/17
to Confluent Platform

Hi,
I am using kafka connect framework (v 0.10.0) in distributed mode to create a scalable microservice consumer application. Basically I have embedded kafka connect into a microservice which dumbs data to a downstream service. Its deployed on kubernetes cluster where it is autoscaled based on cpu percentage.

Currently while testing with big load of around 10 million records, started observing a lot of wakeup exceptions whenever kafka connect worker tries to rebalance due to autoscaling.

Also I observed there is a long silence before kafka connect cluster starts processing again.

My question is does it lead to possible records loss due to offset commit problem ? How much of a big concern is this. If this is an issue does please let me know if its fixed in the latest version.

ERROR 82 --- [pool-6-thread-5] o.a.k.connect.runtime.WorkerSinkTask : Commit of WorkerSinkTask{id=test-connector} offsets threw an unexpected exception:
org.apache.kafka.common.errors.WakeupException: null
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:367)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:361)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:404)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1058)
at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:247)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:293)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:421)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Ewen Cheslack-Postava

unread,
Jul 27, 2017, 2:01:30 AM7/27/17
to Confluent Platform
This sounds like it might just be normal rebalancing, http://docs.confluent.io/current/connect/concepts.html#task-rebalancing gives a quick explanation of what is going on. The autoscaling is likely adding more workers which in turn causes rebalancing of work. In order to rebalance, Connect has to pause work, flush outstanding data, recompute where different tasks will run, and then restart them. This is probably the pause you are seeing.

This absolutely should not result in any data loss. Your logs don't indicate which connector you're using, but by default, connectors would provide at least once semantics meaning you would, at worst, see duplicates. Depending on the connector, some (such as the HDFS connector, S3 connector, ES connector) can provide exactly once semantics. Even if offset commits timeout, you should not lose data.

https://issues.apache.org/jira/browse/KAFKA-5505 has been filed and relates to this. It's probably too optimistic in how we could reduce the global impact/cost of rebalances, but I think there are plenty of improvements we could make before entirely rethinking how these rebalances work.

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/5e1a91b9-929e-4741-82e3-b625cb46380d%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

dmaity

unread,
Jul 27, 2017, 2:46:23 AM7/27/17
to Confluent Platform
Thanks much for your reply. Actually I am not using any available connector but wrote my own custom connector by extending sinkconnector and sinktask. Basically what I am doing is using the sinktask#put method to get the sinkrecords and give it to downstream for processing. I also throw connect exception incase of downstream failure in the same put method which i understand put that particular task into FAILED state. I am ok with getting duplicate records but not data loss.

I also see that there was a fix done around commit failure during rebalance: https://github.com/apache/kafka/commit/2c9796114d0a9638be79b4165d0096c7a63babe7
But this fix is not available in 0.10.0.0. Can this is a possible reason of data loss?

Also is there a way to tell connect to reduce the pause time before it starts processing again?
To post to this group, send email to confluent...@googlegroups.com.

Ewen Cheslack-Postava

unread,
Jul 31, 2017, 12:01:53 PM7/31/17
to Confluent Platform
On Wed, Jul 26, 2017 at 11:46 PM, dmaity <debra...@gmail.com> wrote:
Thanks much for your reply. Actually I am not using any available connector but wrote my own custom connector by extending sinkconnector and sinktask. Basically what I am doing is using the sinktask#put method to get the sinkrecords and give it to downstream for processing. I also throw connect exception incase of downstream failure in the same put method which i understand put that particular task into FAILED state. I am ok with getting duplicate records but not data loss.

This sounds right. You can also use a RetriableException which would allow the framework to retry delivery of the batch of messages that failed. As long as you always either a) entirely accept and process the full set of messages in a put() or b) throw one of those two exceptions, then as long as your connector implementation doesn't have a bug we would not expect to ever see data loss.
 

I also see that there was a fix done around commit failure during rebalance: https://github.com/apache/kafka/commit/2c9796114d0a9638be79b4165d0096c7a63babe7
But this fix is not available in 0.10.0.0. Can this is a possible reason of data loss?

That bug would result in more duplicates than necessary, not data loss.
 

Also is there a way to tell connect to reduce the pause time before it starts processing again?

Which pause time are you referring to?

-Ewen
 
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

dmaity

unread,
Aug 2, 2017, 1:55:21 AM8/2/17
to Confluent Platform
Hi,

The "pause time" that I am referring to is the time taken by worker nodes (of same group.id) to rebalance and restart processing the records again.

Also I am observing that with the below settings the connect framework(0.10.0.0) takes a good amount of time to start processing records? I think it has to do with heartbeat interval or session timeout value. 
Here is the values for some of the parameters I am setting in worker configuration.

    offset.flush.interval.ms: 50000
    offset.flush.timeout.ms: 5000

    consumer.max.poll.records: 10000

    consumer.session.timeout.ms: 300000
    consumer.request.timeout.ms: 310000

    heartbeat.interval.ms: 60000
    session.timeout.ms: 200000

    request.timeout.ms: 310000

On the other hand if I keep the values to default with max.poll.records=10000,  I get a lot of commit failed exception:

Commit of WorkerSinkTask{id=test-connector} offsets threw an unexpected exception
:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at 

Thanks,
Debraj
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/5e1a91b9-929e-4741-82e3-b625cb46380d%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

Ewen Cheslack-Postava

unread,
Aug 2, 2017, 3:52:09 PM8/2/17
to Confluent Platform
The issue is the hearbeat interval. For events like workers joining or leaving the group, the members of the group detect that they need to rebalance via heartbeats. If they are only sent every 60s, it can take up to 60s for a worker to detect that it needs to rebalance, and only then does it start the rebalance process which includes flushing data and stopping all tasks. That means some workers who happen to detect the rebalance earlier could stop work for 60s waiting for the rest of the members to re-join the group.

You could safely reduce max.poll.records to avoid the commit failed exception. If the processing of records is relatively slow, you could definitely see timeouts due to processing of 10000 records taking longer than the poll interval. Note that the message could be a bit misleading though -- the commit may actually have succeeded since the message was sent, you just get the message because the check for whether it timed out is before the next poll() call that would handle the commit response and invoke the callback that that code is waiting for.

-Ewen

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages