Clarification on Kafka Connect running in distributed mode - Production

1,634 views
Skip to first unread message

Saravanan Tirugnanum

unread,
Aug 8, 2016, 9:18:05 PM8/8/16
to Confluent Platform, Saravanan VT
Hi Ewen,

I have list of clarifications on Connect cluster ( v0.9.0.0) running in distributed mode 

1. For some reasons , lets say the topic connect-configs\connect-offsets messages or partitions messed up. Due to this , i suspect the connectors\tasks are not loading up properly and constantly trying to rejoin the group. I also notice that giving a new name to connector using REST API resolves the errors , as i guess this could be key while registering the connectors in config topic along with other key params. So , the question , whats the best possible way to clean up the connect-config topic - I tried changing the cleanup policy to delete but that doesn't seem to work.

2. After sometime , my logs are filled with frequent debug statements ( as am running in test ) sending metadata requests continuously. After some research , i found that when my task thread holding Kafka Producer Instance waiting for sending messages for more than 5 mins , then the constant retries request to metadata happens. I figured to increase the metadata.max.age.ms , but still wanted to check if this is the correct approach.

3. I have two connect clusters ( lets X with 2 nodes x1 , x2  & Y with another 2 nodes y1 & y2 ) both are connected to same Kafka Cluster. When i try to deploy a connector through Y Rest service , the same connector is trying to get loaded in X cluster but it failed ( as classes not  available) . Is this expected behavior. Then i have tried add the cluster property in worker properties ( as i believe it creates a unique namespace within the connect cluster) but this also doesn't seem to work. Any thoughts.

4. Tasks are getting stopped based on the debug statements in the log and reassignments happen. Also i see "Graceful stop of task getting failed" without any changes while after running successfully for sometime along with errors like "Sink task org.apache.kafka.connect.runtime.WorkerSinkTask@e1c86ac was stopped before completing join group. Task initialization and start is being skipped (org.apache.kafka.connect.runtime.WorkerSinkTask)" .. Have i messed up anything major :)


Standalone mode works awesome and all of these problems starting surfacing in dist mode.. Guess need to research more.. Any help would be more appreciated along with any FAQ link for setting up in production mode with configuration..


Regards
Saravanan

Ewen Cheslack-Postava

unread,
Aug 9, 2016, 11:29:19 PM8/9/16
to Confluent Platform, Saravanan VT
On Mon, Aug 8, 2016 at 6:18 PM, Saravanan Tirugnanum <vtsa...@gmail.com> wrote:
Hi Ewen,

I have list of clarifications on Connect cluster ( v0.9.0.0) running in distributed mode 

1. For some reasons , lets say the topic connect-configs\connect-offsets messages or partitions messed up. Due to this , i suspect the connectors\tasks are not loading up properly and constantly trying to rejoin the group. I also notice that giving a new name to connector using REST API resolves the errors , as i guess this could be key while registering the connectors in config topic along with other key params. So , the question , whats the best possible way to clean up the connect-config topic - I tried changing the cleanup policy to delete but that doesn't seem to work.


This sounds odd, I'm not sure why adding a new connector would resolve the issue (which is what I assume you mean by giving a new name since there is no renaming functionality?). The config topic should use compaction -- a time or size based deletion policy would mean configs would "expire" after some time. It should also be a single partition topic.
 
2. After sometime , my logs are filled with frequent debug statements ( as am running in test ) sending metadata requests continuously. After some research , i found that when my task thread holding Kafka Producer Instance waiting for sending messages for more than 5 mins , then the constant retries request to metadata happens. I figured to increase the metadata.max.age.ms , but still wanted to check if this is the correct approach.

When you say constant, how frequently is it? And are there any other messages indicating the error.

I vaguely remember a bug like this but am having trouble finding it now. It would be an issue with the Producer, not Connect itself. If it is refreshing too frequently, there should be something in the log indicating why it's doing that.
 

3. I have two connect clusters ( lets X with 2 nodes x1 , x2  & Y with another 2 nodes y1 & y2 ) both are connected to same Kafka Cluster. When i try to deploy a connector through Y Rest service , the same connector is trying to get loaded in X cluster but it failed ( as classes not  available) . Is this expected behavior. Then i have tried add the cluster property in worker properties ( as i believe it creates a unique namespace within the connect cluster) but this also doesn't seem to work. Any thoughts.


Did you give each cluster their own config/offsets/status topics? These are meant to be dedicated per connect cluster. That's the only way I can think that the clusters would "share" data.
 
4. Tasks are getting stopped based on the debug statements in the log and reassignments happen. Also i see "Graceful stop of task getting failed" without any changes while after running successfully for sometime along with errors like "Sink task org.apache.kafka.connect.runtime.WorkerSinkTask@e1c86ac was stopped before completing join group. Task initialization and start is being skipped (org.apache.kafka.connect.runtime.WorkerSinkTask)" .. Have i messed up anything major :)

That particular message just means that the task didn't get a chance to finish joining its consumer group and the cluster already told it that it needs to shut the task down for another rebalance. This doesn't necessarily indicate a serious problem, but if it happens too frequently, then something is causing the entire Connect cluster to rebalance frequently. This would usually be connectors resetting their task configs or workers timing out for some reason (neither of which should happen *that* frequently). If you just see it occasionally, it's fine. We warn about it in the log because if you see it a lot, then your connectors are not going to be able to make progress.

-Ewen
 

Standalone mode works awesome and all of these problems starting surfacing in dist mode.. Guess need to research more.. Any help would be more appreciated along with any FAQ link for setting up in production mode with configuration..


Regards
Saravanan

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/665a35be-bfc2-4a6e-bcfd-f77808de888f%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Saravanan Tirugnanum

unread,
Aug 12, 2016, 5:46:28 PM8/12/16
to Confluent Platform, sara...@yahoo.co.in
Thanks a lot Ewen.. Please find my responses and more clarifications with errors. Appreciate if you could review this..

Regards
Saravanan


On Tuesday, August 9, 2016 at 10:29:19 PM UTC-5, Ewen Cheslack-Postava wrote:


On Mon, Aug 8, 2016 at 6:18 PM, Saravanan Tirugnanum <vtsa...@gmail.com> wrote:
Hi Ewen,

I have list of clarifications on Connect cluster ( v0.9.0.0) running in distributed mode 

1. For some reasons , lets say the topic connect-configs\connect-offsets messages or partitions messed up. Due to this , i suspect the connectors\tasks are not loading up properly and constantly trying to rejoin the group. I also notice that giving a new name to connector using REST API resolves the errors , as i guess this could be key while registering the connectors in config topic along with other key params. So , the question , whats the best possible way to clean up the connect-config topic - I tried changing the cleanup policy to delete but that doesn't seem to work.


This sounds odd, I'm not sure why adding a new connector would resolve the issue (which is what I assume you mean by giving a new name since there is no renaming functionality?). The config topic should use compaction -- a time or size based deletion policy would mean configs would "expire" after some time. It should also be a single partition topic.

   <Saravanan> Actually there was another problem where another was loaded without my attention and i suppose that has created this confusion.. BTW , i take your points and single partition topic and also this topic being configurable we can change this anytime if we feel the topic is corrupted. Thanks 

 
 
2. After sometime , my logs are filled with frequent debug statements ( as am running in test ) sending metadata requests continuously. After some research , i found that when my task thread holding Kafka Producer Instance waiting for sending messages for more than 5 mins , then the constant retries request to metadata happens. I figured to increase the metadata.max.age.ms , but still wanted to check if this is the correct approach.

When you say constant, how frequently is it? And are there any other messages indicating the error.

I vaguely remember a bug like this but am having trouble finding it now. It would be an issue with the Producer, not Connect itself. If it is refreshing too frequently, there should be something in the log indicating why it's doing that.

<Saravanan> I noticed this is logging continuously when i add a new worker instance when other nodes are running and floods the log.. 
 

3. I have two connect clusters ( lets X with 2 nodes x1 , x2  & Y with another 2 nodes y1 & y2 ) both are connected to same Kafka Cluster. When i try to deploy a connector through Y Rest service , the same connector is trying to get loaded in X cluster but it failed ( as classes not  available) . Is this expected behavior. Then i have tried add the cluster property in worker properties ( as i believe it creates a unique namespace within the connect cluster) but this also doesn't seem to work. Any thoughts.

<Saravanan> This is great.. I wasnt aware of this.. After creating separate topics for different groups , both the cluster running fine without impacting the other..
 


Did you give each cluster their own config/offsets/status topics? These are meant to be dedicated per connect cluster. That's the only way I can think that the clusters would "share" data.
 
4. Tasks are getting stopped based on the debug statements in the log and reassignments happen. Also i see "Graceful stop of task getting failed" without any changes while after running successfully for sometime along with errors like "Sink task org.apache.kafka.connect.runtime.WorkerSinkTask@e1c86ac was stopped before completing join group. Task initialization and start is being skipped (org.apache.kafka.connect.runtime.WorkerSinkTask)" .. Have i messed up anything major :)

That particular message just means that the task didn't get a chance to finish joining its consumer group and the cluster already told it that it needs to shut the task down for another rebalance. This doesn't necessarily indicate a serious problem, but if it happens too frequently, then something is causing the entire Connect cluster to rebalance frequently. This would usually be connectors resetting their task configs or workers timing out for some reason (neither of which should happen *that* frequently). If you just see it occasionally, it's fine. We warn about it in the log because if you see it a lot, then your connectors are not going to be able to make progress.

<Saravanan> Thanks. This is clear.. 

There are multiple errors still surfacing and we would need your expertise guidance..  Below you see list of errors - 
1. First one is occurring more frequently after running successfully for sometime.. 
2. Also i notice the offsets are not going down after my messages are sinked..and committed as per debub


**********************************************************************************************
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
        at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1282)
        at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1213)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:128)
        at org.apache.kafka.connect.runtime.Worker.stopTask(Worker.java:313)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.onRevoked(DistributedHerder.java:898)
        at org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.onJoinPrepare(WorkerCoordinator.java:238)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:209)
        at org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.poll(WorkerGroupMember.java:144)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:266)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:159)
        at java.lang.Thread.run(Thread.java:745)
**********************************************************************************************
org.apache.kafka.common.errors.InterruptException: Flush interrupted.
        at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:548)
        at org.apache.kafka.connect.util.KafkaBasedLog.readToEnd(KafkaBasedLog.java:192)
        at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.get(KafkaOffsetBackingStore.java:112)
        at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:78)
        at com.walmart.ei.oms.transformation.OMSSourceTask.start(OMSSourceTask.java:80)
        at org.apache.kafka.connect.runtime.WorkerSourceTask$WorkerSourceTaskThread.execute(WorkerSourceTask.java:341)
        at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Caused by: java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
        at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
        at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
        at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:422)
        at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:546)
        ... 6 more

**********************************************************************************************
org.apache.kafka.connect.errors.ConnectException: Failed to fetch offsets.
        at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:81)
        at com.walmart.ei.oms.transformation.OMSSourceTask.start(OMSSourceTask.java:80)
        at org.apache.kafka.connect.runtime.WorkerSourceTask$WorkerSourceTaskThread.execute(WorkerSourceTask.java:341)
        at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Caused by: org.apache.kafka.common.errors.InterruptException: Flush interrupted.
        at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:548)
        at org.apache.kafka.connect.util.KafkaBasedLog.readToEnd(KafkaBasedLog.java:192)
        at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.get(KafkaOffsetBackingStore.java:112)
        at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:78)
        ... 3 more
Caused by: java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
        at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
        at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
        at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:422)
        at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:546)

**********************************************************************************************
[2016-08-10 17:38:04,395] ERROR Failed to flush org.apache.kafka.connect.runtime.WorkerSourceTask$2@31437790 offsets to storage:  (org.apache.kafka.connect.runtime.WorkerSourceTask)
org.apache.kafka.common.errors.TimeoutException: Batch Expired
[2016-08-10 17:38:04,395] ERROR Flush of WorkerSourceTask{id=ei-oms-connector-test-240-2} offsets threw an unexpected exception:  (org.apache.kafka.connect.runtime.WorkerSourceTask)
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch Expired
        at org.apache.kafka.connect.storage.KafkaOffsetBackingStore$SetCallbackFuture.get(KafkaOffsetBackingStore.java:207)
        at org.apache.kafka.connect.storage.KafkaOffsetBackingStore$SetCallbackFuture.get(KafkaOffsetBackingStore.java:140)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:284)
        at org.apache.kafka.connect.runtime.WorkerSourceTask$WorkerSourceTaskThread.execute(WorkerSourceTask.java:370)
        at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Caused by: org.apache.kafka.common.errors.TimeoutException: Batch Expired


**********************************************************************************************
Error UNKNOWN_MEMBER_ID occurred while committing offsets for group ei-transformation-pos-client-test-180 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:546)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:487)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:681)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:654)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:350)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:288)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:157)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:182)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:857)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:829)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:171)
        at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
        at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
        at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)



 

-Ewen
 

Standalone mode works awesome and all of these problems starting surfacing in dist mode.. Guess need to research more.. Any help would be more appreciated along with any FAQ link for setting up in production mode with configuration..


Regards
Saravanan

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.



--
Thanks,
Ewen

Saravanan Tirugnanum

unread,
Aug 15, 2016, 2:35:23 PM8/15/16
to Confluent Platform, sara...@yahoo.co.in
Based on the issues we have seen , it seems to be that lot of error handling has been taken care in 0.10.0.0 which is not available in 0.9.0.0 version.
Is there a way we can apply patches alone directly instead of upgrade.
Reply all
Reply to author
Forward
0 new messages