Hdfs-Sink Connector created data-duplication as per file name pattern

Skip to first unread message

Srinivas Yarra

Mar 19, 2021, 7:49:03 AM3/19/21
to Confluent Platform
Hi All,

I'm using hdfs-sink connectors with hive integration. Installed 3-node kafka cluster and same number of connectors are running. When I came cross the parquet files in target hive table location. I observed Hdfs-sink connector weird behaviour. I shared hdfs file as shown below: 
Restarting connectors around 12 hours once due to some kerberos issue. Is there any possibility insert previous offset again while rebalancing worker tasks?
1.   -rw-rw-r--   3 hdfs supergroup    2341466 2021-03-18 06:02 /data/test/input/topics/testdata/day=2021-03-18/hour=04/quarter=4/testdata+2+161638453251+161639013250.parquet
2.   -rw-rw-r--   3 hdfs supergroup    2383185 2021-03-18 06:02 /data/test/input/topics/testdata/day=2021-03-18/hour=04/quarter=4/testdata+2+161638454059+161638453250.parquet ==> data duplication
3.   -rw-rw-r--   3 hdfs supergroup    8725160 2021-03-18 06:03 /data/test/input/topics/testdata/day=2021-03-18/hour=04/quarter=4/testdata+2+161639013251+161639573250.parquet

2nd file start offset (i.e.161638454059) and end offset (i.e. 161638453250) ==> total 559191 records inserted as separate files. All these offsets are already present in first filename as per start offset and end offset.

Connectors configuration
  "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
  "flush.size": "560000",
  "connect.hdfs.principal": "hdfs/xxxxxxyz",
  "tasks.max": "3",
  "timezone": "UTC",
  "hive.database": "test",
  "rotate.interval.ms": "-1",
  "locale": "en-us",
  "hive.tableNames": "testdata",
  "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
  "hive.integration": "true",
  "hive.conf.dir": "/etc/kafka-connect/hive",
  "partition.duration.ms": "3600000",
  "hadoop.conf.dir": "/etc/kafka-connect/hadoop",
  "schema.compatibility": "BACKWARD",
  "connect.hdfs.keytab": "/etc/security/kerberos/keytabs/hdfs.keytab",
  "topics": "testdata",
  "hdfs.url": "hdfs://abc/data/test/input",
  "hdfs.authentication.kerberos": "true",
  "hive.metastore.uris": "thrift://xyz-1:9083,thrift://xyz-2:9083",
  "partition.field.name": "day,hour,quarter",
  "partitioner.class": "io.confluent.connect.hdfs.partitioner.FieldPartitioner",
  "name": "test-hdfs_sink_connector-testdata",
  "timestamp.extractor": "RecordField",
  "timestamp.field": "collection_time"

As per file name creation pattern. I believe that second file start and end offsets should contains within 1st file itself. Correct me if my understanding is wrong.

What could be reasons. Is anything wrong in my connector configuration

Reply all
Reply to author
0 new messages