Replicator duplicates all messages every ~15 minutes

171 views
Skip to first unread message

Victor Merlis

unread,
Jul 14, 2020, 4:51:41 AM7/14/20
to Confluent Platform
Hi,

I'm trying to migrate data from one cluster to another using Replicator. All topics created with the source configuration and the data replicated successfully, but i'm facing some strange behavior. Every ~15 minutes all the data from source cluster replicated again to destination cluster. I guess it's wrong configuration (consumer, producer and replicator configurations attached below)

Logs:
The following logs printed each time the duplication occurs (attached only 1 line but it's for all existing topics):
[2020-07-14 08:18:15,003] INFO [Consumer clientId=replicator-0, groupId=replicator] Resetting offset for partition testTopic-0 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:601)

Consumer Config:
bootstrap.servers=sourceKafkaEndpoint:9092

Producer Config:
bootstrap.servers=destKafkaEndpoint:9092

Replicator Config:
confluent.topic.bootstrap.servers=sourceKafkaEndpoint:9092
offset.start=consumer
topic.config.sync=false

Executable Command:
replicator --consumer.config consumer.properties --producer.config producer.properties --replication.config replication.properties --topic.regex ".*" --cluster.id replicator

Kafka Info:
Source cluster Kafka version: 2.0.1-cp4
Destination cluster Kafka version: 2.4.1

Source Cluster:
  • 3 Brokers
  • 3 Zookeepers
  • ~700 Topics
Destination Cluster:
  • 3 Brokers
  • 3 Zookeepers
  • It's a new cluster contains only one topic (__consumer_offsets)
Consumer Config Values: 
auto.offset.reset = none
bootstrap.servers = [destKafkaEndpoint:9092]
check.crcs = true
client.id = replicator-0
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.min.bytes = 1
group.id = replicator
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.records = 500
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig:279)

Any idea why it's happening?

Victor Merlis

unread,
Jul 14, 2020, 6:03:56 AM7/14/20
to Confluent Platform
UPDATE:
Found that this is happening each time a new topic created on source cluster

Victor Merlis

unread,
Jul 14, 2020, 6:25:44 AM7/14/20
to Confluent Platform
Additional Logs:
[2020-07-14 10:04:50,527] INFO Finished creating connector replicator (org.apache.kafka.connect.runtime.Worker:257)
[2020-07-14 10:04:50,527] INFO SourceConnectorConfig values: 
config.action.reload = restart
connector.class = io.confluent.connect.replicator.ReplicatorSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class io.confluent.connect.replicator.util.ByteArrayConverter
name = replicator
tasks.max = 1
transforms = []
value.converter = class io.confluent.connect.replicator.util.ByteArrayConverter
[2020-07-14 10:04:50,527] INFO EnrichedConnectorConfig values: 
config.action.reload = restart
connector.class = io.confluent.connect.replicator.ReplicatorSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class io.confluent.connect.replicator.util.ByteArrayConverter
name = replicator
tasks.max = 1
transforms = []
value.converter = class io.confluent.connect.replicator.util.ByteArrayConverter
[2020-07-14 10:04:50,550] INFO ReplicatorSourceTaskConfig values: 
confluent.license = 
confluent.topic = _confluent-command
dest.kafka.bootstrap.servers = [destKafkaDNS:9092]
dest.kafka.metric.reporters = []
dest.kafka.metrics.num.samples = 2
dest.kafka.receive.buffer.bytes = 65536
dest.kafka.send.buffer.bytes = 131072
dest.zookeeper.connect = 
offset.timestamps.commit = true
offset.topic.commit = true
offset.translator.tasks.max = -1
offset.translator.tasks.separate = false
partition.assignment = aVeryLongHash...=
provenance.header.enable = false
provenance.header.filter.overrides = 
src.consumer.check.crcs = true
src.consumer.fetch.max.bytes = 52428800
src.consumer.fetch.min.bytes = 1
src.consumer.interceptor.classes = []
src.consumer.max.partition.fetch.bytes = 1048576
src.consumer.max.poll.records = 500
src.header.converter = class io.confluent.connect.replicator.util.ByteArrayConverter
src.kafka.bootstrap.servers = [sourceKafkaDNS:9092]
src.kafka.metric.reporters = []
src.kafka.metrics.num.samples = 2
src.kafka.receive.buffer.bytes = 65536
src.kafka.sasl.client.callback.handler.class = null
src.kafka.sasl.jaas.config = null
src.kafka.sasl.kerberos.kinit.cmd = /usr/bin/kinit
src.kafka.sasl.kerberos.min.time.before.relogin = 60000
src.kafka.sasl.kerberos.ticket.renew.jitter = 0.05
src.kafka.sasl.kerberos.ticket.renew.window.factor = 0.8
src.kafka.sasl.login.callback.handler.class = null
src.kafka.sasl.login.class = null
src.kafka.sasl.login.refresh.buffer.seconds = 300
src.kafka.sasl.login.refresh.min.period.seconds = 60
src.kafka.sasl.login.refresh.window.factor = 0.8
src.kafka.sasl.login.refresh.window.jitter = 0.05
src.kafka.sasl.mechanism = GSSAPI
src.kafka.security.protocol = PLAINTEXT
src.kafka.send.buffer.bytes = 131072
src.kafka.ssl.cipher.suites = null
src.kafka.ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
src.kafka.ssl.endpoint.identification.algorithm = https
src.kafka.ssl.key.password = null
src.kafka.ssl.keymanager.algorithm = SunX509
src.kafka.ssl.keystore.location = null
src.kafka.ssl.keystore.password = null
src.kafka.ssl.keystore.type = JKS
src.kafka.ssl.protocol = TLS
src.kafka.ssl.provider = null
src.kafka.ssl.secure.random.implementation = null
src.kafka.ssl.trustmanager.algorithm = PKIX
src.kafka.ssl.truststore.location = null
src.kafka.ssl.truststore.password = null
src.kafka.ssl.truststore.type = JKS
src.key.converter = class io.confluent.connect.replicator.util.ByteArrayConverter
src.value.converter = class io.confluent.connect.replicator.util.ByteArrayConverter
src.zookeeper.connect = 
task.id = replicator-0
topic.auto.create = true
topic.blacklist = []
topic.config.sync = false
topic.preserve.partitions = true
topic.regex = .*
topic.rename.format = ${topic}
topic.timestamp.type = CreateTime
topic.whitelist = []


config.action.reload = restart looks suspicious :\

Victor Merlis

unread,
Jul 16, 2020, 11:36:41 AM7/16/20
to Confluent Platform
Issue fixed by upgrading to Confluent Kafka 5.5.1
Reply all
Reply to author
Forward
0 new messages