Data loss in data commits in HDFS

712 views
Skip to first unread message

Banjo

unread,
Jun 3, 2017, 7:02:54 AM6/3/17
to Confluent Platform
I have a kafka cluster which is receiving data on a high volume every hour. i am using Kafka Connect to pull the Kafka topic data on HDFS.
There were no issues during a nine hour load run when Kafka Connect consumed around 84,000,000 and was able to write exactly 84,000,000 records on HDFS. 
But suddenly on the 10th onwards till 14th hour, it lost some 35000 odd records to write on HDFS.

My Kafka Connect is a two node setup.

Both these nodes received approximately half of the data produced on Kafka (we have put some debug statements on key converter class- ByteArrayConverter and ). While one of the nodes committed
exactly the same amount of data on HDFS as it consumed, there was a difference by the second node. 

We started looking into the logs. Both at Kafka Connect and at HDFS levels.

HDFS levels: (3 Datanode and 2 Master setup) on Amazon EC2 instances

1- All the daemons are running alright on these five nodes.
2- No ocuurence of strings "error", "exception", "fatal", "fail", "corrupt" on the Datanode logs.
3- We found below errors on Namenode logs. 

2017-06-02 13:30:37,983 INFO org.apache.hadoop.ipc.Server: IPC Server handler 6 on 9000, call org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock from 10.17.20.211:38046 Call#78804 Retry#0
java.io.IOException: File /topics/+tmp/<topic-name>/year=2017/month=06/day=02/hour=19/133f1ad4-2e03-4355-b16f-4135bf72c26c_tmp could only be replicated to 0 nodes instead of minReplication (=1).  
There are 3 datanode(s) running and no node(s) are excluded in this operation.
        at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1571)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3107)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3031)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:725)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:492)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
2017-06-02 13:30:38,244 WARN org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy: Failed to place enough replicas, still in need of 3 to reach 3 (unavailableStorages=[], storagePolicy=BlockStoragePolicy{HOT:7, storageTypes=[DISK], creationFallbacks=[], replicationFallbacks=[ARCHIVE]}, newBlock=true) For more information, please enable DEBUG log level on org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy
2017-06-02 13:30:38,244 WARN org.apache.hadoop.hdfs.protocol.BlockStoragePolicy: Failed to place enough replicas: expected size is 3 but only 0 storage types can be selected (replication=3, selected=[], unavailable=[DISK], removed=[DISK, DISK, DISK], policy=BlockStoragePolicy{HOT:7, storageTypes=[DISK], creationFallbacks=[], replicationFallbacks=[ARCHIVE]})
2017-06-02 13:30:38,244 WARN org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy: Failed to place enough replicas, still in need of 3 to reach 3 (unavailableStorages=[DISK], storagePolicy=BlockStoragePolicy{HOT:7, storageTypes=[DISK], creationFallbacks=[], replicationFallbacks=[ARCHIVE]}, newBlock=true) All required storage types are unavailable:  unavailableStorages=[DISK], storagePolicy=BlockStoragePolicy{HOT:7, storageTypes=[DISK], creationFallbacks=[], replicationFallbacks=[ARCHIVE]}
2017-06-02 13:30:38,244 INFO org.apache.hadoop.ipc.Server: IPC Server handler 7 on 9000, call org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock from 10.17.20.211:38046 Call#78805 Retry#0


Interestingly, complete shutdown of data written on HDFS has not happened. Records are still getting commited on HDFS.

How come log says "could only be replicated to 0 nodes". There are 3 DNs and no node is excluded in this operation. I anyway have 3 DNs only and when all 3 are
running, then why are they excluded in the operation?

And this error did not come always after its first occurence. It occured a few times and there were records being written at that time also. But it lost some records.

Namenode also had below error logs twice but after 8 hours of occurence of above errors.

2017-06-03 09:57:41,907 INFO BlockStateChange: BLOCK* addToInvalidates: blk_1073743667_2856{UCState=UNDER_CONSTRUCTION, truncateBlock=null, primaryNodeIndex=-1, replicas=[ReplicaUC[[DISK]DS-a937880d-a587-4918-8886-4b6f2373696a:NORMAL:10.17.20.156:50010|RBW], ReplicaUC[[DISK]DS-30b0bb59-f1fd-4bba-af1e-04a833c7d96d:NORMAL:10.17.18.34:50010|RBW], ReplicaUC[[DISK]DS-0de89b54-539b-4593-8bcc-4d638f118f72:NORMAL:10.17.19.140:50010|RBW]]} 10.17.19.140:50010 10.17.18.34:50010 10.17.20.156:50010
2017-06-03 09:57:41,911 INFO org.apache.hadoop.ipc.Server: IPC Server handler 8 on 9000, call org.apache.hadoop.hdfs.protocol.ClientProtocol.updateBlockForPipeline from 10.17.20.211:51670 Call#178059 Retry#0
java.io.IOException: BP-1855314710-10.17.18.100-1496237050533:blk_1073743659_2848 does not exist or is not under Constructionnull
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkUCBlock(FSNamesystem.java:6241)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updateBlockForPipeline(FSNamesystem.java:6309)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.updateBlockForPipeline(NameNodeRpcServer.java:806)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.updateBlockForPipeline(ClientNamenodeProtocolServerSideTranslatorPB.java:955)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
2017-06-03 09:57:41,938 INFO org.apache.hadoop.ipc.Server: IPC Server handler 6 on 9000, call org.apache.hadoop.hdfs.protocol.ClientProtocol.updateBlockForPipeline from 10.17.20.211:51670 Call#178116 Retry#0



On Kafka Connect Levels:

We found similar error logs:


[2017-06-02 15:30:03,744] ERROR Exception on topic partition Prd_IN_GeneralEvents-269:  (io.confluent.connect.hdfs.TopicPartitionWriter:324)
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /topics/+tmp/<topic-name>/year=2017/month=06/day=02/hour=20/e41d677b-2f09-48e9-8851-214bd6b91b6

The tmp file names mentioned in the Namenode logs are same as those mentioned in Kafka Connect nodes. (obviuosly)

Where to start looking at in order to resolve this issue?

sd...@spotx.tv

unread,
Jun 26, 2017, 3:31:42 PM6/26/17
to Confluent Platform

I have the same issue. Have you resolve it already?
 

JPS

unread,
Jun 27, 2017, 1:37:28 AM6/27/17
to Confluent Platform
No. I am yet to find a solution for this issue.
Did you get any?

Ewen Cheslack-Postava

unread,
Jun 28, 2017, 1:19:19 AM6/28/17
to Confluent Platform
One common explanation for this issue is that your datanodes are running out of disk space -- this makes sense given that they cannot store the data but "no node(s) were excluded from this operation". Perhaps try following the steps described here https://stackoverflow.com/a/34250778/841827 to determine if that is the issue?

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/1b2dc526-3ac2-471d-9aab-9b7bdd5b6d62%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

JPS

unread,
Jul 3, 2017, 3:02:30 AM7/3/17
to Confluent Platform
There is no disk space issue as I have some 110 GB left in all my datanodes. I do not see any occurence of error or exception string kafka connect logs. But I do see below log in namenode:

java.io.IOException: File /topics/+tmp/testTopic/year=2017/month=07/day=03/hour=03/380f83e3-adaa-4c4a-b195-ccb9ad0c8ddf_tmp could only be replicated to 0 nodes instead of minReplication (=1).  There are 3 datanode(s) running and no node(s) are excluded in this operation.
        at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1571)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3107)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3031)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:725)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:492)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

My dfsadmin report gives 3 live datanodes with 110 GB remaining in each. 

I am not able to point out what went wrong suddenly .

Raul Saez Tapia

unread,
Apr 13, 2018, 5:07:25 AM4/13/18
to Confluent Platform
I have the same issue:


java.io.IOException: File /topics/+tmp/t2/year=2018/month=04/day=13/hour=10/5f736877-41fd-498e-9c62-128b38078920_tmp.parquet could only be replicated to 0 nodes instead of minReplication (=1).  There are 1 datanode(s) running and 1 node(s) are excluded in this operation.

Did you find a solution? Maybe it is a wrong HDFS configuration.

Why there is not a fix for this thread in a year? I would like to know if this connector works fine (I have time for choosing an approach with Akka-streams or Spark Streaming).

sd...@spotx.tv

unread,
Apr 13, 2018, 10:22:46 AM4/13/18
to Confluent Platform
My problem was solved by adjust Heterogeneous Storage policy of HDFS. 

Raul Saez Tapia

unread,
Apr 17, 2018, 5:57:22 AM4/17/18
to Confluent Platform
Thanks sd...@spotx.tv

Finally I did it works removing the network side from the equation. My develop environment was a little bit tricky with ssh tunnels. Maybe the issue was an OK from HDFS namenode that never arrived to the client side. Deploying into the network where HDFS is running all was fine.

=)
Reply all
Reply to author
Forward
0 new messages