avro console consumer giving socket exception in distributed setup

1,083 views
Skip to first unread message

jaidee...@lazada.com

unread,
Aug 19, 2015, 6:15:55 AM8/19/15
to Confluent Platform
Hi all,
I am new to Kafka, so pardon my ignorance!

I have setup a three node cluster in EC2. I can see that the brokers are up and registered in zookeeper.
# in zkCli
ls /brokers/ids
[3, 2, 1]

In all of the config, broker list etc is also configured with correct IP addresses. I have verified the ZK quorum as well.

The problem I am hitting is that I can produce and read messages from the same node, but can't produce to node 1 and  consume from node 2.

The error I get is: (when I produced the message on node 1, and ran the console consumer on node 2)

[2015-08-19 10:05:22,735] ERROR Error processing message, stopping consumer:  (kafka.tools.ConsoleConsumer$:103)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 61
Caused by: java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at java.net.Socket.connect(Socket.java:528)
at sun.net.NetworkClient.doConnect(NetworkClient.java:180)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:432)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:527)
at sun.net.www.http.HttpClient.<init>(HttpClient.java:211)
at sun.net.www.http.HttpClient.New(HttpClient.java:308)
at sun.net.www.http.HttpClient.New(HttpClient.java:326)
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:997)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:933)
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:851)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1301)
at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:468)
at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.httpRequest(RestUtils.java:134)
at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.getId(RestUtils.java:228)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:56)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getByID(CachedSchemaRegistryClient.java:100)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:49)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:117)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:109)
at kafka.tools.ConsoleConsumer$$anonfun$main$1.apply(ConsoleConsumer.scala:168)
at kafka.tools.ConsoleConsumer$$anonfun$main$1.apply(ConsoleConsumer.scala:166)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:166)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
[2015-08-19 10:05:22,735] ERROR Error processing message, stopping consumer:  (kafka.tools.ConsoleConsumer$:103)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 61
Caused by: java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at java.net.Socket.connect(Socket.java:528)
at sun.net.NetworkClient.doConnect(NetworkClient.java:180)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:432)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:527)
at sun.net.www.http.HttpClient.<init>(HttpClient.java:211)
at sun.net.www.http.HttpClient.New(HttpClient.java:308)
at sun.net.www.http.HttpClient.New(HttpClient.java:326)
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:997)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:933)
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:851)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1301)
at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:468)
at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.httpRequest(RestUtils.java:134)
at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.getId(RestUtils.java:228)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:56)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getByID(CachedSchemaRegistryClient.java:100)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:49)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:117)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:109)
at kafka.tools.ConsoleConsumer$$anonfun$main$1.apply(ConsoleConsumer.scala:168)
at kafka.tools.ConsoleConsumer$$anonfun$main$1.apply(ConsoleConsumer.scala:166)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:166)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)

Is this because of some configuration issue? I have checked that the port on node 1 is reachable from node 2 using telnet.

Thanks,
Jaideep

Roger Hoover

unread,
Aug 19, 2015, 11:28:57 AM8/19/15
to confluent...@googlegroups.com
Jaideep,

Do you have the schema registry running?  I think that's where it's getting connection refused.

Cheers,

Roger

 

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/67209c91-fdae-449c-87f5-ad96deab08ef%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Jaideep Datta Dhok

unread,
Aug 19, 2015, 10:17:02 PM8/19/15
to confluent...@googlegroups.com
Hi Roger,
Thanks for the reply. Schema registry was up, but I wasn't passing the 'schema.registry.url' on the command line. I thought console consumer would pick it up from zookeeper. 

This worked - 
kafka-avro-console-consumer --topic test --zookeeper zk1,zk2,zk3 --from-beginning --property schema.registry.url="http://schema-registry-host:8081"

Thanks,
Jaideep

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/DX0t-w2Wig0/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

Roger Hoover

unread,
Aug 19, 2015, 11:22:19 PM8/19/15
to confluent...@googlegroups.com
Great.  Glad you got it figured out.

Sent from my iPhone

Piyush Muthal

unread,
Jun 7, 2016, 9:32:17 AM6/7/16
to Confluent Platform
Thank you Jaideep , This helped me alot. Can you please tell me how I can now use avro consumer in spark application for spark streaming integration with kafka .
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/DX0t-w2Wig0/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages