I have setup a three node cluster in EC2. I can see that the brokers are up and registered in zookeeper.
In all of the config, broker list etc is also configured with correct IP addresses. I have verified the ZK quorum as well.
The problem I am hitting is that I can produce and read messages from the same node, but can't produce to node 1 and consume from node 2.
The error I get is: (when I produced the message on node 1, and ran the console consumer on node 2)
[2015-08-19 10:05:22,735] ERROR Error processing message, stopping consumer: (kafka.tools.ConsoleConsumer$:103)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 61
Caused by: java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at java.net.Socket.connect(Socket.java:528)
at sun.net.NetworkClient.doConnect(NetworkClient.java:180)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:432)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:527)
at sun.net.www.http.HttpClient.<init>(HttpClient.java:211)
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:997)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:933)
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:851)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1301)
at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:468)
at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.httpRequest(RestUtils.java:134)
at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.getId(RestUtils.java:228)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:56)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getByID(CachedSchemaRegistryClient.java:100)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:49)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:117)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:109)
at kafka.tools.ConsoleConsumer$$anonfun$main$1.apply(ConsoleConsumer.scala:168)
at kafka.tools.ConsoleConsumer$$anonfun$main$1.apply(ConsoleConsumer.scala:166)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:166)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
[2015-08-19 10:05:22,735] ERROR Error processing message, stopping consumer: (kafka.tools.ConsoleConsumer$:103)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 61
Caused by: java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at java.net.Socket.connect(Socket.java:528)
at sun.net.NetworkClient.doConnect(NetworkClient.java:180)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:432)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:527)
at sun.net.www.http.HttpClient.<init>(HttpClient.java:211)
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:997)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:933)
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:851)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1301)
at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:468)
at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.httpRequest(RestUtils.java:134)
at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.getId(RestUtils.java:228)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:56)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getByID(CachedSchemaRegistryClient.java:100)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:49)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:117)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:109)
at kafka.tools.ConsoleConsumer$$anonfun$main$1.apply(ConsoleConsumer.scala:168)
at kafka.tools.ConsoleConsumer$$anonfun$main$1.apply(ConsoleConsumer.scala:166)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:166)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Is this because of some configuration issue? I have checked that the port on node 1 is reachable from node 2 using telnet.