Kafka producer how to catch ConnectException?

瀏覽次數:51 次
跳到第一則未讀訊息

Eric Koston

未讀,
2017年11月16日 上午10:12:232017/11/16
收件者:Confluent Platform
I want to figure out, how can I catch the exception, if the kafka server is not online.  
I tried as following: 

    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.LongSerializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    public class KafkaProducerClient {
    
      private final static String TOPIC = "test";
      private final static String BOOTSTRAP_SERVERS = "localhost:9092";
    
      private static Producer<String, String> createProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            LongSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class.getName());
        return new KafkaProducer<>(props);
      }
    
      public static void main(String[] args) {
    
    
        final Producer<String, String> producer = createProducer();
        try {
    
          final ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "key", "Hello value ");
          RecordMetadata metadata = producer.send(record).get();
    
        } catch (InterruptedException e) {
          e.printStackTrace();
        } catch (ExecutionException e) {
          e.printStackTrace();
        } catch (Exception e) {
          e.printStackTrace();
        } finally {
          producer.flush();
          producer.close();
        }
      }
    }

I've got following error:

    09:50:59.271 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=KafkaExampleProducer] Node -1 disconnected.
    09:50:59.271 [kafka-producer-network-thread | KafkaExampleProducer] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=KafkaExampleProducer] Connection to node -1 could not be established. Broker may not be available.
    09:50:59.271 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=KafkaExampleProducer] Give up sending metadata request since no node is available
    09:50:59.322 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=KafkaExampleProducer] Give up sending metadata request since no node is available
    09:50:59.373 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=KafkaExampleProducer] Give up sending metadata request since no node is available
    09:50:59.423 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=KafkaExampleProducer] Give up sending metadata request since no node is available
    09:50:59.474 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=KafkaExampleProducer] Give up sending metadata request since no node is available
    09:50:59.525 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=KafkaExampleProducer] Give up sending metadata request since no node is available
    09:50:59.576 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=KafkaExampleProducer] Give up sending metadata request since no node is available
    09:50:59.626 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=KafkaExampleProducer] Give up sending metadata request since no node is available
    09:50:59.677 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=KafkaExampleProducer] Give up sending metadata request since no node is available
    09:50:59.727 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=KafkaExampleProducer] Give up sending metadata request since no node is available
    09:50:59.778 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=KafkaExampleProducer] Give up sending metadata request since no node is available
    09:50:59.829 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=KafkaExampleProducer] Give up sending metadata request since no node is available
    09:50:59.880 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=KafkaExampleProducer] Give up sending metadata request since no node is available
    09:50:59.931 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=KafkaExampleProducer] Give up sending metadata request since no node is available
    09:50:59.982 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=KafkaExampleProducer] Give up sending metadata request since no node is available
    09:51:00.033 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=KafkaExampleProducer] Give up sending metadata request since no node is available
    09:51:00.084 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=KafkaExampleProducer] Initialize connection to node localhost:9092 (id: -1 rack: null) for sending metadata request
    09:51:00.084 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=KafkaExampleProducer] Initiating connection to node localhost:9092 (id: -1 rack: null)
    09:51:00.085 [kafka-producer-network-thread | KafkaExampleProducer] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=KafkaExampleProducer] Connection with localhost/127.0.0.1 disconnected
    java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
    at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:106)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:444)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:398)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
    at java.lang.Thread.run(Thread.java:748) 

as you can see, the error never got caught. 
What am I doing wrong?
回覆所有人
回覆作者
轉寄
0 則新訊息