My source is generating record with expectation of receiving Ten Million JSONs per hour. With flush.size as 25000, HADOOP_HEAPSIZE as 12000 MB for masters and HADOOP_HEAPSIZE as 8096 MB for slaves, I am not seeing any error in the logs but there is no commit happening to HDFS. There was no data in /topics/topic1 path. Although, I could see records in /topics/+tmp/topic1 but that did not get flushed out from +tmp. 25000 as flush size would have been met within minutes considering my data generation rate.
I changed the config of my connector and reduced flush.size to 3000 using below command:
Connector started, started reading a few data and immediately threw below Java Heap Space Out of Memory Exception error:
Exception in thread "kafka-coordinator-heartbeat-thread | connect-run-1-hdfs-sink" java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:343)
at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:266)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:865)
...........READ SOME MORE DATA from TOPICS......................
[2017-02-17 16:25:16,612] ERROR Task run-1-hdfs-sink-1 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:343)
at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1031)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:317)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:235)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:744)
.............READ SOME MORE DATA from TOPICS................
I saw that data is getting pushed to HDFS for one topic as of now and not for the second.
My queries:
- Why this JAVA HEAP SPACE error came with small value of flush.size and not for higher values and why HDFS write did not happen in the later case?
- Because of this JAVA HEAP SPACE issue, I would certainly have lost some record which might have got generated. How to overcome this JAVA HEAP SPACE error?
- This JAVA HEAP SPACE error occurred in both the kafka connect nodes. There must be some common configuration fix for this. How to apply this?
- Is there a limit on highest value which I can use for flush.size which would not throw this error and subsequently push data to HDFS?
Thanks
Nishant Verma