MirrorMaker not replicating all topics

134 views
Skip to first unread message

Stuart Wong

unread,
Sep 8, 2015, 4:57:36 PM9/8/15
to Confluent Platform
Hi,

We have a 3 node source Kafka cluster in our local DC, and a remote destination Kafka cluster in AWS with MirrorMaker doing replication from local to AWS. Things look good, except that only a single topic got replicated, and there are errors in the MirrorMaker logs:

Sep 08 20:44:27 myhost.local.dev bash[21047]: [2015-09-08 20:44:27,540] WARN Fetching topic metadata with correlation id 9596 for topics [Set(solarwinds-interface-traffic, remedy_base, remedy_rel, remedy_log)] from broker [id:1,host:myhost.local.dev,port:9092] failed (kafka.client.ClientUtils$)
Sep 08 20:44:27 myhost.local.dev bash[21047]: java.nio.channels.ClosedChannelException
Sep 08 20:44:27 myhost.local.dev bash[21047]: at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
Sep 08 20:44:27 myhost.local.dev bash[21047]: at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
Sep 08 20:44:27 myhost.local.dev bash[21047]: at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
Sep 08 20:44:27 myhost.local.dev bash[21047]: at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
Sep 08 20:44:27 myhost.local.dev bash[21047]: at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
Sep 08 20:44:27 myhost.local.dev bash[21047]: at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
Sep 08 20:44:27 myhost.local.dev bash[21047]: at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
Sep 08 20:44:27 myhost.local.dev bash[21047]: at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
Sep 08 20:44:27 myhost.local.dev bash[21047]: [2015-09-08 20:44:27,544] WARN [mm-1_stluengtst01.monsanto.com-1441743044535-dc0a32f0-leader-finder-thread], Failed to add leader for partitions [solarwinds-interface-traffic,0],[remedy_base,0],[solarwinds-interface-traffic,3],[remedy_log,0],[solarwinds-interface-traffic,1],[remedy_rel,0],[solarwinds-interface-traffic,2]; will retry (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
Sep 08 20:44:27 myhost.local.dev bash[21047]: java.nio.channels.ClosedChannelException
Sep 08 20:44:27 myhost.local.dev bash[21047]: at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
Sep 08 20:44:27 myhost.local.dev bash[21047]: at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
Sep 08 20:44:27 myhost.local.dev bash[21047]: at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
Sep 08 20:44:27 myhost.local.dev bash[21047]: at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127)
Sep 08 20:44:27 myhost.local.dev bash[21047]: at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:166)
Sep 08 20:44:27 myhost.local.dev bash[21047]: at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
Sep 08 20:44:27 myhost.local.dev bash[21047]: at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:177)
Sep 08 20:44:27 myhost.local.dev bash[21047]: at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:172)
Sep 08 20:44:27 myhost.local.dev bash[21047]: at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
Sep 08 20:44:27 myhost.local.dev bash[21047]: at scala.collection.immutable.Map$Map3.foreach(Map.scala:161)
Sep 08 20:44:27 myhost.local.dev bash[21047]: at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
Sep 08 20:44:27 myhost.local.dev bash[21047]: at kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:172)
Sep 08 20:44:27 myhost.local.dev bash[21047]: at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:87)
Sep 08 20:44:27 myhost.local.dev bash[21047]: at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:77)
Sep 08 20:44:27 myhost.local.dev bash[21047]: at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
Sep 08 20:44:27 myhost.local.dev bash[21047]: at scala.collection.immutable.Map$Map3.foreach(Map.scala:161)
Sep 08 20:44:27 myhost.local.dev bash[21047]: at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
Sep 08 20:44:27 myhost.local.dev bash[21047]: at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:77)
Sep 08 20:44:27 myhost.local.dev bash[21047]: at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95)
Sep 08 20:44:27 myhost.local.dev bash[21047]: at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
--------------

Kafka: 0.8.2.1 (2.11.5)

Any ideas what these errors are about and how to resolve?

Thanks.
Reply all
Reply to author
Forward
0 new messages