hdfs connect error

352 views
Skip to first unread message

Shannon Ma

unread,
Oct 14, 2016, 5:24:58 PM10/14/16
to Confluent Platform
Hi,

I set up a hdfs connect and it was working for a while, then in the log, first i see


16:12| INFO | AbstractCoordinator.java 434 | Successfully joined group connect-hdfs-traffic-sink-p with generation 1
16:12| INFO | ConsumerCoordinator.java 219 | Setting newly assigned partitions [TRAFFIC_DATA_P-0] for group connect-hdfs-traffic-sink-p
16:12| INFO | TopicPartitionWriter.java 193 | Started recovery for topic partition TRAFFIC_DATA_P-0
16:12| ERROR | TopicPartitionWriter.java 214 | Recovery failed at state RECOVERY_PARTITION_PAUSED
org.apache.kafka.connect.errors.ConnectException: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.RecoveryInProgressException): Failed to close file /tmp/TRAFFIC_DATA_P/0/log. Lease recovery is in progress. Try again later.
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:3141)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2933)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:3217)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:3181)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:614)
at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.append(AuthorizationProviderProxyClientProtocol.java:126)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:416)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080)

at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:88)
at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:105)
at io.confluent.connect.hdfs.TopicPartitionWriter.applyWAL(TopicPartitionWriter.java:441)
at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:197)
at io.confluent.connect.hdfs.DataWriter.recover(DataWriter.java:239)
at io.confluent.connect.hdfs.DataWriter.open(DataWriter.java:281)
at io.confluent.connect.hdfs.HdfsSinkTask.open(HdfsSinkTask.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:428)
at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1000(WorkerSinkTask.java:54)
at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:464)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:316)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.RecoveryInProgressException): Failed to close file /tmp/TRAFFIC_DATA_P/0/log. Lease recovery is in progress. Try again later.
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:3141)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2933)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:3217)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:3181)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:614)
at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.append(AuthorizationProviderProxyClientProtocol.java:126)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:416)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080)

at org.apache.hadoop.ipc.Client.call(Client.java:1468)
at org.apache.hadoop.ipc.Client.call(Client.java:1399)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
at com.sun.proxy.$Proxy15.append(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:313)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy16.append(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1767)
at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1803)
at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1796)
at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:323)
at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:319)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:319)
at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1173)
at io.confluent.connect.hdfs.wal.WALFile$Writer.<init>(WALFile.java:221)
at io.confluent.connect.hdfs.wal.WALFile.createWriter(WALFile.java:67)
at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:73)
... 45 more
16:13| INFO | FSWAL.java 75 | Successfully acquired lease for hdfs://VM03cent7:8020/tmp/TRAFFIC_DATA_P/0/log



-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------


Then i see 



16:15| WARN | DFSOutputStream.java 691 | DataStreamer Exception
java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. (Nodes: current=[10.200.5.162:50010], original=[10.200.5.162:50010]). The current failed datanode replacement policy is DEFAULT, and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration.
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.findNewDatanode(DFSOutputStream.java:1040)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1106)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1253)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:594)
16:15| ERROR | TopicPartitionWriter.java 328 | Error closing hdfs://VM03cent7:8020/tmp/TRAFFIC_DATA_P/0/log.
org.apache.kafka.connect.errors.ConnectException: Error closing hdfs://VM03cent7:8020/tmp/TRAFFIC_DATA_P/0/log
at io.confluent.connect.hdfs.wal.FSWAL.close(FSWAL.java:156)
at io.confluent.connect.hdfs.TopicPartitionWriter.close(TopicPartitionWriter.java:326)
at io.confluent.connect.hdfs.DataWriter.close(DataWriter.java:296)
at io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkTask.java:109)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:301)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:432)
at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:54)
at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:476)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:283)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:212)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:316)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. (Nodes: current=[10.200.5.162:50010], original=[10.200.5.162:50010]). The current failed datanode replacement policy is DEFAULT, and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration.
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.findNewDatanode(DFSOutputStream.java:1040)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1106)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1253)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:594)
16:15| ERROR | DataWriter.java 298 | Error closing writer for TRAFFIC_DATA_P-0. Error: {]
16:15| INFO | WorkerSinkTask.java 261 | WorkerSinkTask{id=hdfs-traffic-sink-p-0} Committing offsets
16:15| INFO | AbstractCoordinator.java 326 | (Re-)joining group connect-hdfs-traffic-sink-p
16:15| INFO | AbstractCoordinator.java 434 | Successfully joined group connect-hdfs-traffic-sink-p with generation 2
16:15| INFO | ConsumerCoordinator.java 219 | Setting newly assigned partitions [TRAFFIC_DATA_P-0] for group connect-hdfs-traffic-sink-p
16:15| INFO | TopicPartitionWriter.java 193 | Started recovery for topic partition TRAFFIC_DATA_P-0
16:15| INFO | AbstractCoordinator.java 434 | Successfully joined group connect-hdfs-traffic-sink-p with generation 2
16:15| INFO | ConsumerCoordinator.java 219 | Setting newly assigned partitions [] for group connect-hdfs-traffic-sink-p
16:15| INFO | TopicPartitionWriter.java 208 | Finished recovery for topic partition TRAFFIC_DATA_P-0
16:16| ERROR | WorkerSinkTask.java 401 | Task hdfs-traffic-sink-p-0 threw an uncaught and unrecoverable exception
java.lang.NullPointerException
at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:234)
at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:91)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:381)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:227)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
16:16| ERROR | WorkerSinkTask.java 402 | Task is being killed and will not recover until manually restarted
16:16| ERROR | WorkerTask.java 142 | Task hdfs-traffic-sink-p-0 threw an uncaught and unrecoverable exception
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:403)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:227)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
16:16| ERROR | WorkerTask.java 143 | Task is being killed and will not recover until manually restarted
16:16| INFO | MonitoringInterceptor.java 161 | Publish thread interrupted!

Shannon Ma

unread,
Oct 14, 2016, 5:36:39 PM10/14/16
to Confluent Platform
Now, i set the configuration as it suggested, i cleared the hdfs directory, clean all kakfa under /tmp so it started fresh, now i am getting this, 




17:33| ERROR | WorkerSinkTask.java 401 | Task hdfs-traffic-sink-p-0 threw an uncaught and unrecoverable exception
java.lang.NullPointerException
  at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:234)
  at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:91)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:381)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:227)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
  at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
  at java.util.concurrent.FutureTask.run(Unknown Source)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
  at java.lang.Thread.run(Unknown Source)
17:33| ERROR | WorkerSinkTask.java 402 | Task is being killed and will not recover until manually restarted
17:33| ERROR | WorkerTask.java 142 | Task hdfs-traffic-sink-p-0 threw an uncaught and unrecoverable exception

Dustin Cote

unread,
Oct 14, 2016, 7:48:42 PM10/14/16
to confluent...@googlegroups.com

The message below indicates HDFS is down because all datanodes are unreachable. You probably should have a look on the HDFS side and make sure you have live datanodes. The namenode UI is a good place to start for that.

Caused by: java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. (Nodes: current=[10.200.5.162:50010], original=[10.200.5.162:50010]). The current failed datanode replacement policy is DEFAULT, and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/7fa620d6-7fd1-4e07-a39f-3755380d5d8b%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Shannon Ma

unread,
Oct 17, 2016, 11:51:48 AM10/17/16
to Confluent Platform
Thanks,  I will check the HDFS side.
To post to this group, send email to confluent...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages