Hi,
I am currently having a ubuntu setup which contains 30 GB of RAM and 500 GB of local storage. I am running a custom sink kafka connector which upserts data to MYSQL DB. Below is my connect distributed properties. I am running my connect as a docker (please see below). Also, I have modified kafka-run-class to always pick KAFKA_HEAP_OPTS="-Xmx8g". Also, while running docker am passing --memory=8g.
In the environment, around 300,000 records are added to kafka topic every 15 mins and my sink connector needs to pull them from topic and upsert into MYSQL DB.
I am seeing OOM, java heap space issues after processing around 75,000 records from kafka topic to MYSQL DB.
In the same machine am running all my kafka, schema registry, rest proxy as containers with advertised hostname provided to kafka.
Please let me know if there is another way to forcefully set KAFKA_HEAP_OPTS to pick 8 GB and also how can I avoid OOM issues ?
kafka-run-class:
# Memory options
#if [ -z "$KAFKA_HEAP_OPTS" ]; then
KAFKA_HEAP_OPTS="-Xmx8g"
#fi
docker run --memory=8g --memory-swap=-1 -e JAVA_OPTS='-Xmx8g' -p 8083:8083 -v /data/prod/shared_volume/kafka/kafka-connect-logs:/var/log/kafka/ jdbcconnect
Connect-distributed.properties:
bootstrap.servers=<hostname>:9092
group.id=""
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
consumer.max.poll.records=100
consumer.enable.auto.commit=true
consumer.auto.commit.interval.ms=10000
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "KafkaBasedLog Work Thread - connect-status"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-4"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-3"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-2"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "DistributedHerder"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "qtp667346055-26"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "KafkaBasedLog Work Thread - connect-offsets"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "KafkaBasedLog Work Thread - connect-configs"
Exception in thread "org.eclipse.jetty.server.session.HashSessionManager@7d9d0818Timer" java.lang.OutOfMemoryError: Java heap space
2017-10-13 11:50:52,320 ERROR Task Test-0 threw an uncaught and unrecoverable exception during shutdown (org.apache.kafka.connect.runtime.WorkerTask:123)
java.lang.OutOfMemoryError: Java heap space