<Saravanan> Thanks. This is clear..
There are multiple errors still surfacing and we would need your expertise guidance.. Below you see list of errors -
1. First one is occurring more frequently after running successfully for sometime..
2. Also i notice the offsets are not going down after my messages are sinked..and committed as per debub
**********************************************************************************************
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1282)
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1213)
at org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:128)
at org.apache.kafka.connect.runtime.Worker.stopTask(Worker.java:313)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.onRevoked(DistributedHerder.java:898)
at org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.onJoinPrepare(WorkerCoordinator.java:238)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:209)
at org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.poll(WorkerGroupMember.java:144)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:266)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:159)
at java.lang.Thread.run(Thread.java:745)
**********************************************************************************************
org.apache.kafka.common.errors.InterruptException: Flush interrupted.
at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:548)
at org.apache.kafka.connect.util.KafkaBasedLog.readToEnd(KafkaBasedLog.java:192)
at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.get(KafkaOffsetBackingStore.java:112)
at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:78)
at com.walmart.ei.oms.transformation.OMSSourceTask.start(OMSSourceTask.java:80)
at org.apache.kafka.connect.runtime.WorkerSourceTask$WorkerSourceTaskThread.execute(WorkerSourceTask.java:341)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Caused by: java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:422)
at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:546)
... 6 more
**********************************************************************************************
org.apache.kafka.connect.errors.ConnectException: Failed to fetch offsets.
at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:81)
at com.walmart.ei.oms.transformation.OMSSourceTask.start(OMSSourceTask.java:80)
at org.apache.kafka.connect.runtime.WorkerSourceTask$WorkerSourceTaskThread.execute(WorkerSourceTask.java:341)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Caused by: org.apache.kafka.common.errors.InterruptException: Flush interrupted.
at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:548)
at org.apache.kafka.connect.util.KafkaBasedLog.readToEnd(KafkaBasedLog.java:192)
at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.get(KafkaOffsetBackingStore.java:112)
at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:78)
... 3 more
Caused by: java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:422)
at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:546)
**********************************************************************************************
[2016-08-10 17:38:04,395] ERROR Failed to flush org.apache.kafka.connect.runtime.WorkerSourceTask$2@31437790 offsets to storage: (org.apache.kafka.connect.runtime.WorkerSourceTask)
org.apache.kafka.common.errors.TimeoutException: Batch Expired
[2016-08-10 17:38:04,395] ERROR Flush of WorkerSourceTask{id=ei-oms-connector-test-240-2} offsets threw an unexpected exception: (org.apache.kafka.connect.runtime.WorkerSourceTask)
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch Expired
at org.apache.kafka.connect.storage.KafkaOffsetBackingStore$SetCallbackFuture.get(KafkaOffsetBackingStore.java:207)
at org.apache.kafka.connect.storage.KafkaOffsetBackingStore$SetCallbackFuture.get(KafkaOffsetBackingStore.java:140)
at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:284)
at org.apache.kafka.connect.runtime.WorkerSourceTask$WorkerSourceTaskThread.execute(WorkerSourceTask.java:370)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Caused by: org.apache.kafka.common.errors.TimeoutException: Batch Expired
**********************************************************************************************
Error UNKNOWN_MEMBER_ID occurred while committing offsets for group ei-transformation-pos-client-test-180 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:546)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:487)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:681)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:654)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:350)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:288)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:157)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:182)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:857)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:829)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:171)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)