What should be ideal value of tasks.max?

249 views
Skip to first unread message

Nishant Verma

unread,
Feb 17, 2017, 2:09:22 AM2/17/17
to Confluent Platform
Hi 

I have a three data node + 2 name node cluster. Cluster capacity is 500G. RAM of each node is 32G.
My kafka connect is running in distributed mode on 2 nodes.
My kafka broker is a 3 node cluster.

My source is generating small JSON files which is dumped onto HDFS sink using kafka connect.

I have changed HEAP_SIZE in my hadoop-env.sh as 12G for Masters and 8G for slaves as I was getting Java Heap Size - Out of Memory error. I am yet to test it after applying this HEAP_SIZE changes.


I have kept flush.size as 3000 to ensure small size JSON files are not dumped into HDFS. Earlier with flush.size as 50, some small bytes of data were getting dumped. After increasing it to 3000, some 2MB size of data were getting dumped. We will further increase it to 25000 so that larger size data goes to HDFS. I have set below properties in connect-distributed.properties:

max.poll.records=500
enable.auto.commit=true

What should be the ideal value of tasks.max for my scenario? Currently it is 14. My source is generating 50000 small JSON records in one minute. I want to run kafka connect on these 2 nodes overnight daily for some 2 weeks.

Thanks
Nishant Verma  

Nishant Verma

unread,
Feb 17, 2017, 6:42:37 AM2/17/17
to Confluent Platform
Edit1:

With flush.size as 25000:

My source is generating record with expectation of receiving Ten Million JSONs per hour. With flush.size as 25000, HADOOP_HEAPSIZE as 12000 MB for masters and HADOOP_HEAPSIZE as 8096 MB for slaves, I am not seeing any error in the logs but there is no commit happening to HDFS. There was no data in /topics/topic1 path. Although, I could see records in /topics/+tmp/topic1 but that did not get flushed out from +tmp. 25000 as flush size would have been met within minutes considering my data generation rate.

With flush.size as 3000:

I changed the config of my connector and reduced flush.size to 3000 using below command:
curl -H "Content-Type: application/json" -X PUT -d '{"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "format.class": "com.qubole.streamx.SourceFormat", "tasks.max": "14", "hdfs.url": "hdfs://ip-10-16-37-124:9000", "topics": "topic1,topic2", "partitioner.class": "io.confluent.connect.hdfs.partitioner.DailyPartitioner", "locale": " en.UTF-8", "flush.size": "3000", "timezone": "Asia/Calcutta" }' http://localhost:8083/connectors/run-1-hdfs-sink/config

Connector started, started reading a few data and immediately threw below Java Heap Space Out of Memory Exception error:

Exception in thread "kafka-coordinator-heartbeat-thread | connect-run-1-hdfs-sink" java.lang.OutOfMemoryError: Java heap space
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
        at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:343)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:266)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:865)

...........READ SOME MORE DATA from TOPICS......................

[2017-02-17 16:25:16,612] ERROR Task run-1-hdfs-sink-1 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
java.lang.OutOfMemoryError: Java heap space
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
        at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:343)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1031)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:317)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:235)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:744)


.............READ SOME MORE DATA from TOPICS................

I saw that data is getting pushed to HDFS for one topic as of now and not for the second. 

My queries:
- Why this JAVA HEAP SPACE error came with small value of flush.size and not for higher values and why HDFS write did not happen in the later case?
- Because of this JAVA HEAP SPACE issue, I would certainly have lost some record which might have got generated. How to overcome this JAVA HEAP SPACE error?
- This JAVA HEAP SPACE error occurred in both the kafka connect nodes. There must be some common configuration fix for this. How to apply this?
- Is there a limit on highest value which I can use for flush.size which would not throw this error and subsequently push data to HDFS?

Thanks
Nishant Verma

Nishant Verma

unread,
Feb 20, 2017, 4:41:45 AM2/20/17
to Confluent Platform
Hi

Can anyone suggest something here? I am getting the same Java heap space error. I changed HADOOP_CLIENT_OPTS to HADOOP_CLIENT_OPTS="-Xmx12000m $HADOOP_CLIENT_OPTS" in hadoop-env.sh but the same error came today morning and the task failed subsequently.

Thanks
Nishant Verma

On Friday, February 17, 2017 at 12:39:22 PM UTC+5:30, Nishant Verma wrote:

Nishant Verma

unread,
Feb 26, 2017, 1:19:02 AM2/26/17
to Confluent Platform
exported _JAVA_OPTIONS="-Xms12000m -Xmx12000m" in .bashrc in all the node members. Now JAVA HEAP SPACE is not coming atleast. 

But, the issue is even though connect workers is continuously reading data from Kafka and no error is thrown also. But still data is not getting pushed to HDFS. No space crunch, no error or exception is shown in nohup.out. I am not able to figure out what is wrong here. If I delete the topic_name directory from /topics path, then connect starts writing data to HDFS but only for sometime.It again stops writing data to HDFS. 

I did a grep for "Starting commit and rotation for topic partition" in nohup.out and only a couple of occurence of this string comes there. 

What is the reason that despite no error or exception thrown, data is not written to HDFS? This is my curl command.

curl -H "Content-Type: application/json" -X POST -d '{"name": "kafka-to-hdfs-2", "config": {"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "format.class": "com.qubole.streamx.SourceFormat", "tasks.max": "4", "hdfs.url": "hdfs://ip-10-16-37-124:9000", "topics": "Prd_IN_GeneralEvents,Prd_IN_Alerts,Prd_IN_TripAnalysis", "partitioner.class": "io.confluent.connect.hdfs.partitioner.DailyPartitioner", "partitioner.class": "io.confluent.connect.hdfs.partitioner.HourlyPartitioner", "locale": " en.UTF-8", "flush.size": "300", "timezone": "Asia/Calcutta" }}' http://localhost:8083/connectors

Data is present in Kafka .

Thanks
Nishant Verma

On Friday, February 17, 2017 at 12:39:22 PM UTC+5:30, Nishant Verma wrote:

Dustin Cote

unread,
Feb 27, 2017, 3:39:41 PM2/27/17
to confluent...@googlegroups.com
It seems like if data is being written for awhile and then it stops, that maybe you are having a change in your data write rate to Kafka and you aren't reaching the flush.size. Check in the WAL directory to see if you are seeing data written. This will be in your `logs.dir` directory on HDFS. It is possible that data is being written there and not moved to the final destination yet as the roll policy hasn't been triggered.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/5f6b3794-7937-43c0-87b5-eb644300aa82%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Dustin Cote
Customer Operations Engineer | Confluent
Follow us: Twitter | blog

Nishant Verma

unread,
Feb 28, 2017, 3:59:32 AM2/28/17
to Confluent Platform
My roll policy configurations are below:

max.poll.records=200

As of now, I am using flush.size to be around 200-500 with tasks.max as 4. 

Errors like "Error discarding temp file", "ERROR Exception on topic partition", "WARN I/O error constructing remote block reader." are coming as attached in log file. I do have records getting flushed at times to /logs/<topic-name> which point to +tmp path. 

Why these errors are coming as seen in attached file. Before this, there were times, when no such error was seen in nohup.out, but no data was written to HDFS. When I deleted /topics/<topic-name> directory, after sometime that directory was created and data was started getting written. 

Did it acquire some kind of write lock on /topics/<topic-name> which was released when I deleted it ? It has happened twice or thrice.

Nishant Verma

On Friday, February 17, 2017 at 12:39:22 PM UTC+5:30, Nishant Verma wrote:
error-conn1.txt

Dustin Cote

unread,
Feb 28, 2017, 8:13:07 AM2/28/17
to confluent...@googlegroups.com
It looks like there is some corruption of your HDFS filesystem:

Caused by: org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: BP-1831277630-10.16.37.124-1484306078618:blk_1073879832_144830 file=/logs/Prd_IN_GeneralEvents/227/log

I'd start looking at the integrity of HDFS because if you have missing blocks, that can cause the problem you are seeing. If data is being written to the /logs directory than the connector is getting data to HDFS, but if you are losing blocks, rolling the WAL is going to be problematic. 

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Nishant Verma

unread,
Mar 3, 2017, 7:56:06 AM3/3/17
to confluent...@googlegroups.com
I did check my HDFS integrity but all I found was my filesystem on '/' to be HEALTHY. 

I am getting logs like:

[2017-03-03 18:17:46,814] ERROR Recovery failed at state RECOVERY_PARTITION_PAUSED (io.confluent.connect.hdfs.TopicPartitionWriter:229)
org.apache.kafka.connect.errors.ConnectException: Error creating writer for log file hdfs://ip-10-16-37-124:9000/logs/Prd_IN_GeneralEvents/349/log

org.apache.kafka.connect.errors.ConnectException: Error creating writer for log file hdfs://ip-10-16-37-124:9000/logs/Prd_IN_GeneralEvents/87/log

For all the partitions.

Nishant

sent from handheld device. please ignore typos.

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

To post to this group, send email to confluent-platform@googlegroups.com.



--
Dustin Cote
Customer Operations Engineer | Confluent
Follow us: Twitter | blog

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/qmCzJDQAzXw/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent-platform@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages