HDFS Connector consumed 400K+ messages before throwing CommitFailedException - no output file!

550 views
Skip to first unread message

Uday Menon

unread,
Jan 7, 2016, 2:49:58 PM1/7/16
to Confluent Platform
 We setup a pipeline comprising a jdbc connector (publisher) and an hdfs connector (consumer) which effectively pumps data out of a SQlServer table into hdfs. A standalone worker was used for the hdfs connector as follows:

/usr/bin/connect-standalone /etc/schema-registry/connect-avro-standalone2.properties /etc/kafka-connect-hdfs/test-hdfs.properties

The hdfs connector ran for over an hour, processing 400K + messages before throwing a 

This hdfs consumer ran fine for 1hr 45 min (11:30 - 1:22) before throwing a CommitFailedException. Last few lines in console show that 454220 rows were processed:

[2016-01-07 13:19:56,829] INFO Committed hdfs://prodny-hdp03.mio.local:8020/topics/test-sqlserver-jdbc-CLEAN_VLTLTY_CORR_TRD_OUTCOME/partition=0/test-sqlserver-jdbc-CLEAN_VLTLTY_CORR_TRD_OUTCOME+0+0000454215+0000454217.avro for test-sqlserver-jdbc-CLEAN_VLTLTY_CORR_TRD_OUTCOME-0 (io.confluent.connect.hdfs.TopicPartitionWriter:577)
[2016-01-07 13:19:56,830] INFO Starting commit and rotation for topic partition test-sqlserver-jdbc-CLEAN_VLTLTY_CORR_TRD_OUTCOME-0 with start offsets {partition=0=454218} and end offsets {partition=0=454220} (io.confluent.connect.hdfs.TopicPartitionWriter:267)
[2016-01-07 13:19:56,875] INFO Committed hdfs://prodny-hdp03.mio.local:8020/topics/test-sqlserver-jdbc-CLEAN_VLTLTY_CORR_TRD_OUTCOME/partition=0/test-sqlserver-jdbc-CLEAN_VLTLTY_CORR_TRD_OUTCOME+0+0000454218+0000454220.avro for test-sqlserver-jdbc-CLEAN_VLTLTY_CORR_TRD_OUTCOME-0 (io.confluent.connect.hdfs.TopicPartitionWriter:577)
[2016-01-07 13:19:56,878] INFO org.apache.kafka.connect.runtime.WorkerSinkTask@24376b4d Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:187)
[2016-01-07 13:19:56,879] INFO Marking the coordinator 2147483647 dead. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:529)
[2016-01-07 13:19:56,879] ERROR Error ILLEGAL_GENERATION occurred while committing offsets for group connect-hdfs-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:550)
[2016-01-07 13:19:56,879] ERROR Commit of Thread[WorkerSinkTask-hdfs-sink-0,5,main] offsets threw an unexpected exception:  (org.apache.kafka.connect.runtime.WorkerSinkTask:101)
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:552)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:493)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:665)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:644)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)

The process did not crash but isn't doing anything useful at this point. Keep seeing the following lines in the console:

[2016-01-07 14:14:11,570] INFO Attempt to join group connect-hdfs-sink failed due to unknown member id, resetting and retrying. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:354)
[2016-01-07 14:14:11,576] INFO Started recovery for topic partition test-sqlserver-jdbc-CLEAN_VLTLTY_CORR_TRD_OUTCOME-0 (io.confluent.connect.hdfs.TopicPartitionWriter:193)
[2016-01-07 14:16:27,737] INFO Finished recovery for topic partition test-sqlserver-jdbc-CLEAN_VLTLTY_CORR_TRD_OUTCOME-0 (io.confluent.connect.hdfs.TopicPartitionWriter:208)
[2016-01-07 14:16:27,738] INFO org.apache.kafka.connect.runtime.WorkerSinkTask@24376b4d Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:187)
[2016-01-07 14:16:27,917] ERROR Error ILLEGAL_GENERATION occurred while committing offsets for group connect-hdfs-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:550)

Upon killing the process and checking for output file, I found that it never wrote to hdfs! Was this caused by shutdown not being graceful? Here's the console output for shutdown:

[2016-01-07 14:16:39,279] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:68)
[2016-01-07 14:16:40,339] INFO Stopped ServerConnector@4816c290{HTTP/1.1}{0.0.0.0:8085} (org.eclipse.jetty.server.ServerConnector:306)
[2016-01-07 14:16:41,910] INFO Stopped o.e.j.s.ServletContextHandler@5ae76500{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:865)
[2016-01-07 14:16:41,912] INFO Herder stopping (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:62)
[2016-01-07 14:16:41,912] INFO Stopping task hdfs-sink-0 (org.apache.kafka.connect.runtime.Worker:305)
[2016-01-07 14:16:41,914] INFO Starting graceful shutdown of thread WorkerSinkTask-hdfs-sink-0 (org.apache.kafka.connect.util.ShutdownableThread:119)
[2016-01-07 14:16:46,967] INFO Forcing shutdown of thread WorkerSinkTask-hdfs-sink-0 (org.apache.kafka.connect.util.ShutdownableThread:141)
[2016-01-07 14:16:46,967] ERROR Graceful stop of task org.apache.kafka.connect.runtime.WorkerSinkTask@24376b4d failed. (org.apache.kafka.connect.runtime.Worker:312)
Exception in thread "Thread-1" java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
        at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1294)

I have 2 questions:

1. How can I avoid the CommitFailedException?
2. If the process does throw exceptions, what is the mechanism for graceful shutdown so that output file is preserved?

Thanks


Ewen Cheslack-Postava

unread,
Jan 8, 2016, 12:00:00 AM1/8/16
to Confluent Platform
In this case, the CommitFailedException actually shouldn't matter because the HDFS connector manages its own offsets. Further, this error is logged in that code, but then it should proceed normally -- the failure of the commit is obviously not ideal, but should not affect correctness because in the worst case a failure would just result in re-processing some messages.

The fact that it ran for so long without committing a file is more concerning. Can you share the configuration you're using for the connector? Normally it should rotate to a new file after it reaches a specified # of messages (flush.size) or after a specified interval (rotate.inverval.ms). 400K messages sounds like a lot for it not to have reached one of those conditions.

Both the connector and framework have to handle exceptions carefully, and handle all types of exceptions. The HDFS connector was written so it can recover from exceptions, picking up where it previously left off and carefully handling the records it is passed so it doesn't lose any due to exception. In the framework, we're catching exceptions and attempting to handle them gracefully, but there is still work to be done to improve how this is handled -- status tracking, including REST API support, and control over restarting failed tasks is planned for the next release.

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/a9853320-cf7e-4565-957a-7dff62922c48%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Uday Menon

unread,
Jan 8, 2016, 3:21:50 PM1/8/16
to Confluent Platform
Hi Ewen,

Here's the config file I used for the hdfs connector:

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test-sqlserver-jdbc-CLEAN_VLTLTY_CORR_TRD_OUTCOME
flush.size=3
#hdfs.url=hdfs://localhost:9000
hdfs.url=hdfs://prodny-hdp03.mio.local:8020

From your post it would appear that the flush.size is too low. Regardless, it doesn't explain why there are no files. I tried restarting the standalone worker for the hdfs connector but started seeing outofmemory errors. I'm in the process of restarting the whole setup after controlling for database table size (will pick a smaller table) etc. 

Thanks
Uday
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.



--
Thanks,
Ewen

Ewen Cheslack-Postava

unread,
Jan 8, 2016, 7:06:18 PM1/8/16
to Confluent Platform
Uday,

When file commit/rotation starts, there should be a log entry that starts with


Starting commit and rotation for topic partition

It is logged at INFO level from io.confluent.connect.hdfs.TopicPartitionWriter. Do you see this anywhere in your logs? The checks that are performed that would trigger that message are very straightforward (see https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java#L376) so if it is not hitting that, then there should probably be some other information in the logs that indicates an error in some other stage of processing messages.

-Ewen

Hi Ewen,

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.



--
Thanks,
Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Stephanie Lv

unread,
Jun 15, 2016, 12:49:49 PM6/15/16
to Confluent Platform
Hi Uday,

I didn't get the CommitFailedException but I had the same issue when "flush.size" is a huge number. 

It turns out the next file on hdfs destination displays only when new coming logs are enough to match the "flush.size", that's most likely the reason that you couldn't see the HDFS connector consuming the data. Also, I find another folder under "topics.dir" with the name of "+tmp", where you can find related topic folder like /topicName/partition=0/*.avro. The .avro file looks the temporary log cache, before the number of new logs reaches "flush.size". But I failed to check log data within this .avro, probably because of the behavior on HDFS that file is not available during writing.

I'm playing Confluent.IO/Kafka only for a few weeks, could be totally wrong with my assumption. Just take it as a reference:)

Reply all
Reply to author
Forward
0 new messages