How to handle Kafka connection refused Exception

3,321 views
Skip to first unread message

Xiaojing Gong

unread,
Apr 14, 2016, 3:49:38 AM4/14/16
to kafka-clients
Hi,

I want to save the data to Cassandra when Kafka can't publishing the data. So I wrote the following code , using Call back to handle the exceptions. However, call back doesn't have connection Exception. My question is where can I handle Kafka Connection Exception like connection get refused. 


public void publishToKafka(String topicName,String msg) throws Exception {

producerRecord = new ProducerRecord(topicName,msg);

final long startTime = System.currentTimeMillis();
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e != null) {

kafkaFailurePublishServiceImpl.saveToCassandra(producerRecord.topic(),producerRecord.value().toString());

}
}

});
final long stopTime = System.currentTimeMillis();

System.out.println("Time Consumed = "+ (stopTime-startTime));
}

gerard...@dizzit.com

unread,
Apr 14, 2016, 6:14:28 AM4/14/16
to kafka-clients
You can add a try/catch around the 'producer.send(producerRecordcallBack) and catch ge general runtime KafkaException, not very pretty, but it works. I ran into a similar problem when a had a serialisation problem. The callback is only used if the connection and serialisation work, and the bits are actually send.

Xiaojing Gong

unread,
Apr 14, 2016, 12:26:23 PM4/14/16
to kafka-clients
Hi Thank you for replying me.

I tried , however the error not catch, it just throw out the connection refused error message :( , it not go that catch block. 

try {

producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {

kafkaFailurePublishServiceImpl.saveToCassandra(producerRecord.topic(), producerRecord.value().toString());

            } else {
System.out.print("testing");

}
}

});
final long stopTime = System.currentTimeMillis();

System.out.println("Time Consumed = " + (stopTime - startTime));
} catch (KafkaException e) {
kafkaFailurePublishServiceImpl.saveToCassandra(producerRecord.topic(), producerRecord.value().toString());
}
[2016-04-14 11:23:53.420] boot - 2767  WARN [kafka-producer-network-thread | ideal-kafka-adapter] --- Selector: Error in I/O with serena.cof.ds.capitalone.com/10.219.18.189
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Thread.java:745) 

Xiaojing Gong

unread,
Apr 14, 2016, 12:27:24 PM4/14/16
to kafka-clients
[2016-04-14 11:23:53.420] boot - 2767  WARN [kafka-producer-network-thread | ideal-kafka-adapter] --- Selector: Error in I/O with *****
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Thread.java:745)

gerard...@dizzit.com

unread,
Apr 15, 2016, 11:31:53 AM4/15/16
to kafka-clients
You can add it to the catch with (KafkaException | ConnectException e)  But not sure is this is supposed to happen.

Xiaojing Gong

unread,
Apr 18, 2016, 6:01:37 PM4/18/16
to kafka-clients
:(

It is not allowed me to add connectionException here 

HunJae Lee

unread,
Aug 30, 2016, 1:48:22 AM8/30/16
to kafka-clients
I am facing the same situation...
Is there anyway to catch the ConnectionException?

Prasad Dls

unread,
Dec 14, 2016, 1:14:45 PM12/14/16
to kafka-clients
I am also facing same issue, not able get exception when my broker is down or not able to reach

iba...@bbtv.com

unread,
Mar 8, 2017, 2:09:43 PM3/8/17
to kafka-clients
Same issue over here... isn't there any solution for it?

ke...@databerries.com

unread,
Mar 9, 2017, 8:49:20 AM3/9/17
to kafka-clients
Same issue for me. Any update?
Reply all
Reply to author
Forward
0 new messages