Duplicate messages being received by Sink Connectors in distributed mode.

815 views
Skip to first unread message

Renukaradhya H D

unread,
Jan 18, 2017, 2:01:13 AM1/18/17
to Confluent Platform
Hi,

My set up details:

Confluent 3.0.1.

1) I have Kafka, Zookeeper, Schema Registry running. I created a topic with name "dev.ps_primary_delivery" and it has 8 partitions.

    I started pushing data to the topic dev.ps_primary_delivery and that topic had 1.7 million data at the end.

2) I started my 2 workers on two different EC2 machines using below configurations and both have same configuration.

sh ./bin/connect-distributed ./etc/schema-registry/connect-avro-distributed.properties

connect-avro-distributed.properties configurations are below:
-------------------------------------------------------------------------------------------------------------------------
bootstrap.servers=devmetric.com:9091
group.id=group9
consumer.max.poll.records=200


key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://devkafka01.com:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://devkafka01.com:8081


internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

schema.registry.url=http://devkafka01.com:8081

key.converter.schemas.enable=true
value.converter.schemas.enable=true

config.storage.topic=connect-configs

offset.storage.topic=connect-offsets

status.storage.topic=connect-statuses
-----------------------------------------------------------------------------------------------------------------------------

3) Then I started two connector using below configuration:

{
"name": "deliverydata-connectorJan18-5",
"config": {
"connector.class": "com.operative.kafka.connect.sink.DeliverySinkConnector",
"tasks.max": "2",
"topics": "dev.ps_primary_delivery",
"elasticsearch.cluster.name": "ad_metrics_store",
"elasticsearch.hosts": "devkafka1.com:9300",
"elasticsearch.bulk.size": "100",
"tenants": "tenant1"
}
}

same configuration for second connector but name was "deliverydata-connectorJan18-6".

4) After 1 hour I wanted to see what all data is being processed by each connector. So I started analyzing the logger statements which I had put.

Issues: 
  • I saw duplicate data like same offset from a partition being processed by both connectors. I have attached logs for the same which tell which offsets from each partitions are processed. At the end, out of 1.7 million total data, both have processed 1.7 million each instead of half of the data which it should process.
  • Some times same data is received  twice by a connector.
Logs in  below location:


Please let me know If I am doing some thing wrong in configuration. Please let me know If u need more information.

-Aradhya


Renukaradhya H D

unread,
Jan 18, 2017, 8:00:55 AM1/18/17
to Confluent Platform
Hi,
My question is posted on stackoverflow. You can more information from this post.


-Aradhya
Reply all
Reply to author
Forward
0 new messages