OOM exceptions for custom kafka connect

893 views
Skip to first unread message

Arun Shankar

unread,
Oct 13, 2017, 9:34:28 AM10/13/17
to Confluent Platform
Hi,

I am currently having a ubuntu setup which contains 30 GB of RAM and 500 GB of local storage. I am running a custom sink kafka connector which upserts data to MYSQL DB. Below is my connect distributed properties. I am running my connect as a docker (please see below). Also, I have modified kafka-run-class to always pick KAFKA_HEAP_OPTS="-Xmx8g". Also, while running docker am passing --memory=8g.

In the environment, around 300,000 records are added to kafka topic every 15 mins and my sink connector needs to pull them from topic and upsert into MYSQL DB. 

I am seeing OOM, java heap space issues after processing around 75,000 records from kafka topic to MYSQL DB.

In the same machine am running all my kafka, schema registry, rest proxy as containers with advertised hostname provided to kafka.

Please let me know if there is another way to forcefully set KAFKA_HEAP_OPTS to pick 8 GB and also how can I avoid OOM issues ?

kafka-run-class:

# Memory options
#if [ -z "$KAFKA_HEAP_OPTS" ]; then
KAFKA_HEAP_OPTS
="-Xmx8g"
#fi



docker run --memory=8g --memory-swap=-1 -e JAVA_OPTS='-Xmx8g' -p 8083:8083 -v /data/prod/shared_volume/kafka/kafka-connect-logs:/var/log/kafka/ jdbcconnect


Connect-distributed.properties:

bootstrap
.servers=<hostname>:9092
group.id=""
key
.converter=org.apache.kafka.connect.storage.StringConverter
key
.converter.schemas.enable=false
value
.converter=org.apache.kafka.connect.storage.StringConverter
value
.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset
.storage.topic=connect-offsets
offset
.storage.replication.factor=1
config
.storage.topic=connect-configs
config
.storage.replication.factor=1
status
.storage.topic=connect-status
status
.storage.replication.factor=1
consumer
.max.poll.records=100
consumer
.enable.auto.commit=true
consumer
.auto.commit.interval.ms=10000


Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "KafkaBasedLog Work Thread - connect-status"
 
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-4"
 
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-3"
 
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-2"
 
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "DistributedHerder"
 
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "qtp667346055-26"
 
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "KafkaBasedLog Work Thread - connect-offsets"
 
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "KafkaBasedLog Work Thread - connect-configs"
Exception in thread "org.eclipse.jetty.server.session.HashSessionManager@7d9d0818Timer" java.lang.OutOfMemoryError: Java heap space
2017-10-13 11:50:52,320 ERROR   Task Test-0 threw an uncaught and unrecoverable exception during shutdown   (org.apache.kafka.connect.runtime.WorkerTask:123)
java
.lang.OutOfMemoryError: Java heap space

foreve...@hotmail.com

unread,
Sep 7, 2018, 6:37:34 AM9/7/18
to Confluent Platform
hi there, I am encounter the same problem as you are. How did you fix this? 

在 2017年10月13日星期五 UTC+8下午9:34:28,Arun Shankar写道:

Arun Shankar

unread,
Sep 7, 2018, 7:07:20 AM9/7/18
to Confluent Platform
This generally happens when you have connect code having resource leak issues. Re-visit your code using sonar or any such tools and analyse.
Reply all
Reply to author
Forward
0 new messages