Stuck connectors unable to send messages

1,621 views
Skip to first unread message

Zach Walker

unread,
Nov 20, 2021, 12:24:13 PM11/20/21
to debezium
Hello, 

I have been working on this problem for a few weeks and have run out of ideas.  See a description of our setup at the bottom.   I don't know if this is necessarily a debezium specific problem, but that is the only producing connector we are running.   I am hoping this community has the experience to help debug it.   

We see an endless repetition of the same error messages like below for a handful of the debezium connectors.  We have seen this state occur both during snapshoting and during bin log following.  We have also seen it happen on one but not all of the connectors running on a worker.  We can provide details of settings, jmx metrics, and logs that would be useful for helping to understand the problem.

Setting the log level of the WorkerSourceTask to TRACE does not give any additional log messages.  This state can go on for several days without resolving itself.  Restarting the connectors has been our only way to resolve the problem.

Notice:
1. There are 2 debezium tasks running on this worker
2. The number of messages outstanding for each connector is staying the same at each error message.

[2021-11-20 16:36:22,566] ERROR [prod659-debezium-20211119|task-0|offsets] WorkerSourceTask{id=prod659-debezium-20211119-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter)
[2021-11-20 16:36:22,566] INFO [prod214-debezium-20211119|task-0|offsets] WorkerSourceTask{id=prod214-debezium-20211119-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-11-20 16:36:22,566] INFO [prod214-debezium-20211119|task-0|offsets] WorkerSourceTask{id=prod214-debezium-20211119-0} flushing 4039 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-11-20 16:37:22,567] ERROR [prod214-debezium-20211119|task-0|offsets] WorkerSourceTask{id=prod214-debezium-20211119-0} Failed to flush, timed out while waiting for producer to flush outstanding 4039 messages (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-11-20 16:37:22,567] ERROR [prod214-debezium-20211119|task-0|offsets] WorkerSourceTask{id=prod214-debezium-20211119-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter)
[2021-11-20 16:37:22,567] INFO [prod659-debezium-20211119|task-0|offsets] WorkerSourceTask{id=prod659-debezium-20211119-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-11-20 16:37:22,567] INFO [prod659-debezium-20211119|task-0|offsets] WorkerSourceTask{id=prod659-debezium-20211119-0} flushing 71718 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-11-20 16:38:22,567] ERROR [prod659-debezium-20211119|task-0|offsets] WorkerSourceTask{id=prod659-debezium-20211119-0} Failed to flush, timed out while waiting for producer to flush outstanding 71718 messages (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-11-20 16:38:22,567] ERROR [prod659-debezium-20211119|task-0|offsets] WorkerSourceTask{id=prod659-debezium-20211119-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter)
[2021-11-20 16:38:22,567] INFO [prod214-debezium-20211119|task-0|offsets] WorkerSourceTask{id=prod214-debezium-20211119-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-11-20 16:38:22,567] INFO [prod214-debezium-20211119|task-0|offsets] WorkerSourceTask{id=prod214-debezium-20211119-0} flushing 4039 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-11-20 16:39:22,567] ERROR [prod214-debezium-20211119|task-0|offsets] WorkerSourceTask{id=prod214-debezium-20211119-0} Failed to flush, timed out while waiting for producer to flush outstanding 4039 messages (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-11-20 16:39:22,567] ERROR [prod214-debezium-20211119|task-0|offsets] WorkerSourceTask{id=prod214-debezium-20211119-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter)
[2021-11-20 16:39:22,567] INFO [prod659-debezium-20211119|task-0|offsets] WorkerSourceTask{id=prod659-debezium-20211119-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-11-20 16:39:22,567] INFO [prod659-debezium-20211119|task-0|offsets] WorkerSourceTask{id=prod659-debezium-20211119-0} flushing 71718 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-11-20 16:40:22,567] ERROR [prod659-debezium-20211119|task-0|offsets] WorkerSourceTask{id=prod659-debezium-20211119-0} Failed to flush, timed out while waiting for producer to flush outstanding 71718 messages (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-11-20 16:40:22,567] ERROR [prod659-debezium-20211119|task-0|offsets] WorkerSourceTask{id=prod659-debezium-20211119-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter)
[2021-11-20 16:40:22,568] INFO [prod214-debezium-20211119|task-0|offsets] WorkerSourceTask{id=prod214-debezium-20211119-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-11-20 16:40:22,568] INFO [prod214-debezium-20211119|task-0|offsets] WorkerSourceTask{id=prod214-debezium-20211119-0} flushing 4039 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-11-20 16:41:22,568] ERROR [prod214-debezium-20211119|task-0|offsets] WorkerSourceTask{id=prod214-debezium-20211119-0} Failed to flush, timed out while waiting for producer to flush outstanding 4039 messages (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-11-20 16:41:22,568] ERROR [prod214-debezium-20211119|task-0|offsets] WorkerSourceTask{id=prod214-debezium-20211119-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter)
[2021-11-20 16:41:22,568] INFO [prod659-debezium-20211119|task-0|offsets] WorkerSourceTask{id=prod659-debezium-20211119-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-11-20 16:41:22,568] INFO [prod659-debezium-20211119|task-0|offsets] WorkerSourceTask{id=prod659-debezium-20211119-0} flushing 71718 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-11-20 16:42:22,568] ERROR [prod659-debezium-20211119|task-0|offsets] WorkerSourceTask{id=prod659-debezium-20211119-0} Failed to flush, timed out while waiting for producer to flush outstanding 71718 messages (org.apache.kafka.connect.runtime.WorkerSourceTask)

Our setup is as follows
Debezium 1.6 using the mysql plugin
Kafaka Connect running via cp-helm-chart on EKS
  • version - 6.1.1-ccs
  • commit sha: c209f70c6c2e52ae
  • connect workers - 96
    • RAM - 32GB per worker
    • Max Heap 28 GB per worker
    • Attempting to run no more than 2 debezium tasks per worker
  • connectors
    • debezium - ~150
      • Source Databases
        • 200- 300 databases
        • 150,000 tables across all databases
        • 1,500,000 columns across all databases
    • other - snowflake (~30 tasks)
Kafka (2.7.0) running in MSK


Zach Walker

unread,
Nov 20, 2021, 6:01:09 PM11/20/21
to debezium
A few more things worth mentioning.
  • According to the REST API, the debezium connectors and their tasks from the logs above are still in the RUNNING state
  • No exceptions have been produced since the connectors were started

Gunnar Morling

unread,
Nov 23, 2021, 6:32:59 AM11/23/21
to debezium
Hey Zach,

Those are impressive numbers in terms of numbers of connectors, tables, etc.

On the issue itself, this may indicate a connectivity issue between Connect and the Kafka cluster; I would expect though that this resolves itself eventually. Or, perhaps an issue related to bursty load, there's interesting two-part write-up on this topic here:


Perhaps experimenting with the settings described in this post can help to mitigate the situation?

Best,

--Gunnar

Zach Walker

unread,
Nov 23, 2021, 5:55:55 PM11/23/21
to debe...@googlegroups.com
Gunnar,

Thanks for your response.  My team had already come across those articles and we spent a few days making sure we could extract the various metrics described.  

In the end we found that though we could reduce the number of requests made to Kafka using the described tuning tweaks, the problem we observe is not quite the same.  

Though the author of that blog post does not explicitly say so, I have inferred that thought the offset commit was failing, the connect worker was still able to send messages and once the message queue was flushed the offset commits would succeed.  In other words, the authors cdc pipeline was still flowing but ours is not.

You may be right that the worker eventually recovers but if that is the case, it seems like I should be able to find some log output or metric that would point me to where the pipeline is stuck.

Do you have any suggestions for specific metrics we could look at to understand the backup?  Or, any suggestions on log levels to increase?

Zach

--
You received this message because you are subscribed to a topic in the Google Groups "debezium" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/debezium/B0-HIJk0VHo/unsubscribe.
To unsubscribe from this group and all its topics, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/a2d11efa-64c4-4f58-a361-0181e4e476b7n%40googlegroups.com.
--
Zach Walker

​Staff Software Engineer | AppFolio
------------------------------

Find Us Online -- 

Zach Walker

unread,
Dec 2, 2021, 12:21:08 AM12/2/21
to debe...@googlegroups.com
Hi All,

I wanted to post an update on this and what we found to be the resolution of the issue in our case.   Our problem and solution was different from any others we have seen people write up.  The following solution and summary were put together by Brandon Stanley on our data engineering team here at Appfolio.

We were able to resolve the flush timeout issue by increasing the following Kafka Connect configuration: status.storage.partitions from 5 to 25 (https://docs.confluent.io/platform/current/installation/configuration/connect/index.html#connectconfigs_status.storage.partitions).

Here are the steps we took which prompted us to update the value:

We observed various metrics for our connectors and noticed that the buffer-available-bytes metric for all of our connectors that were stuck had reached 0 but never recovered.  At first glance this was somewhat expected since we could see that data was backing up though it seemed like a symptom of the problem, and not the cause.

As per the documentation, buffer.memory is: 

"The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will block for max.block.ms after which it will throw an exception.

This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests."

We set the logging level to TRACE for the connectors for all producer loggers to see if we could find any exceptions that were being thrown related to the buffer memory. 

This was done using the following command:

curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/admin/loggers/org.apache.kafka.clients.producer -d '{"level": "TRACE"}' | jq '.'

Then, we noticed the following message within the logs: 

[2021-11-23 00:48:10,071] DEBUG [Producer clientId=producer-2] Exception occurred during message send: (org.apache.kafka.clients.producer.KafkaProducer) org.apache.kafka.clients.producer.BufferExhaustedException: Failed to allocate memory within the configured max blocking time 60000 ms.

This made us notice that the buffer available bytes were running out not only for our debezium producer but also for "producer-2" which we have learned is a kafka connect producer that writes the connector status to a special kafka connect topic used for coordinating connector status across workers and signaling what topics a connector may be using. Upon discovery, we initially wanted to increase the available buffer memory for producer-2 but were unsuccessful in this attempt. It seems that there was no way to override the producer.buffer memory value for that specific producer. 

Instead, we looked for other configurations that we had the ability to modify. We looked at the associated configurations for kafka connect and found that the number of partitions defaults to 5 based on this configuration.

Given the number of connectors/tables/topics for our use case, 5 partitions seemed too small. We decided to increase the number of partitions to 25 (based on the default configuration value for another internally managed topic: offset.storage.partitons). Once we increased the number of partitions to 25, both Failed to commit offsets and BufferExhaustedException messages were no longer thrown. In summary, our problem seems to have been that kafka connect needed further partitioning of the status storage topic to keep up when running as many workers and connectors as we are in the same worker group. After restarting all our connectors from scratch, we have not seen the same problem reoccur again.

Lingering questions and suggestions (For kafka connect, not specific to debezium)::

1. Why does the org.apache.kafka.connect.runtime.WorkerSourceTask execute loop produce no log output in this situation even while in TRACE?  It's not clear to us what code path is being executed or where it is blocked.

2. The names of producer-1, producer-2, producer-3 would be more helpful if they included something about their function.

With this problem behind us, we will now be looking into improving our snapshotting speed at scale, It appears we see a significant slowdown of snapshotting when running all of our connectors vs a single connector.  We may post again for help when we better understand what parameters we might tweak.  If anyone has suggestions in the meantime we welcome any wisdom.

Hope this helps someone else.

On Tue, Nov 23, 2021 at 3:33 AM 'Gunnar Morling' via debezium <debe...@googlegroups.com> wrote:
--
You received this message because you are subscribed to a topic in the Google Groups "debezium" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/debezium/B0-HIJk0VHo/unsubscribe.
To unsubscribe from this group and all its topics, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/a2d11efa-64c4-4f58-a361-0181e4e476b7n%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages