Kafka with ssl is not working

797 views
Skip to first unread message

sounthar cs

unread,
Dec 16, 2019, 1:52:31 AM12/16/19
to golang-nuts
Hi All,

I am trying to consume data from the existing kafka server. 

I am able to consume it successfully through python code.

But i want the same to be working with golang.


i have tried sarama, confluent-kafka-go & github.com/segmentio/kafka-go

i am getting the below error:

2019/12/16 06:28:32 client/metadata fetching metadata for all topics from broker *.*.*.*:9093

2019/12/16 06:28:32 Failed to connect to broker *.*.*.*:9093: tls: first record does not look like a TLS handshake

2019/12/16 06:28:32 client/metadata got error from broker -1 while fetching metadata: tls: first record does not look like a TLS handshake

2019/12/16 06:28:32 client/metadata no available broker to send metadata request to

2019/12/16 06:28:32 client/brokers resurrecting 3 dead seed brokers

2019/12/16 06:28:32 Closing Client

2019/12/16 06:28:32 unable to create kafka client: "kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"


 

package main


// Run with:

// go build examples/base-client/*.go

// ./base-client


import (

"crypto/tls"

"crypto/x509"

"io/ioutil"

"log"

"os"

"os/signal"

"sync"


"github.com/Shopify/sarama"

)


func main() {

logger := log.New(os.Stderr, "", log.LstdFlags)

sarama.Logger = logger

tlsConfig, err := NewTLSConfig("bo-rsa.pem",

"bo-rsa.key",

"ca.pem")

if err != nil {

log.Fatal(err)

}

// This can be used on test server if domain does not match cert:

// tlsConfig.InsecureSkipVerify = true


consumerConfig := sarama.NewConfig()

consumerConfig.Net.TLS.Enable = true

consumerConfig.Net.TLS.Config = tlsConfig


client, err := sarama.NewClient([]string{"*.*.*.*:9093","*.*.*.*:9093","*.*.*.*:9093"}, consumerConfig)

if err != nil {

log.Fatalf("unable to create kafka client: %q", err)

}


consumer, err := sarama.NewConsumerFromClient(client)

if err != nil {

log.Fatal(err)

}

defer consumer.Close()


consumerLoop(consumer, "rlcmData__ONOS__CTC31")

}


// NewTLSConfig generates a TLS configuration used to authenticate on server with

// certificates.

// Parameters are the three pem files path we need to authenticate: client cert, client key and CA cert.

func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string) (*tls.Config, error) {

tlsConfig := tls.Config{}


// Load client cert

cert, err := tls.LoadX509KeyPair(clientCertFile, clientKeyFile)

if err != nil {

return &tlsConfig, err

}

tlsConfig.Certificates = []tls.Certificate{cert}

tlsConfig.InsecureSkipVerify =true

// Load CA cert

caCert, err := ioutil.ReadFile(caCertFile)

if err != nil {

return &tlsConfig, err

}

caCertPool := x509.NewCertPool()

caCertPool.AppendCertsFromPEM(caCert)

tlsConfig.RootCAs = caCertPool


tlsConfig.BuildNameToCertificate()

return &tlsConfig, err

}


func consumerLoop(consumer sarama.Consumer, topic string) {

partitions, err := consumer.Partitions(topic)

if err != nil {

log.Println("unable to fetch partition IDs for the topic", topic, err)

return

}


// Trap SIGINT to trigger a shutdown.

signals := make(chan os.Signal, 1)

signal.Notify(signals, os.Interrupt)


var wg sync.WaitGroup

for partition := range partitions {

wg.Add(1)

go func() {

consumePartition(consumer, int32(partition), signals)

wg.Done()

}()

}

wg.Wait()

}


func consumePartition(consumer sarama.Consumer, partition int32, signals chan os.Signal) {

log.Println("Receving on partition", partition)

partitionConsumer, err := consumer.ConsumePartition("test", partition, sarama.OffsetNewest)

if err != nil {

log.Println(err)

return

}

defer func() {

if err := partitionConsumer.Close(); err != nil {

log.Println(err)

}

}()


consumed := 0

ConsumerLoop:

for {

select {

case msg := <-partitionConsumer.Messages():

log.Printf("Consumed message offset %d\nData: %s\n", msg.Offset, msg.Value)

consumed++

case <-signals:

break ConsumerLoop

}

}

log.Printf("Consumed: %d\n", consumed)

}

Tamás Gulácsi

unread,
Dec 16, 2019, 3:31:31 AM12/16/19
to golang-nuts
Are you sure that the Kafka instance you're connecting to does use TLS?
openssl s_client -connect ...:9093
can connect?

sounthar cs

unread,
Dec 16, 2019, 5:42:52 AM12/16/19
to golang-nuts
Yes i am able to connect using that command.

The server is working fine with python code.

from kafka import KafkaProducer, KafkaConsumer

from time import gmtime, strftime

import io

import io

#print "Start 'test'" + strftime("%Y-%m-%d %H:%M:%S", gmtime())

broker_string = "*********************";

kafka_brokers_list = broker_string.split(',')

print kafka_brokers_list

print "Creating consumer"

consumer = KafkaConsumer(bootstrap_servers=kafka_brokers_list,

group_id='test',

                         security_protocol='SSL',

ssl_check_hostname=False,

                         ssl_cafile='ca.pem',

                         ssl_certfile='bo-rsa.pem',

                         ssl_keyfile='bo-rsa.key')

print consumer.topics()

consumer.subscribe(['*****************'])

for msg in consumer:

 #buf2 = io.BytesIO(msg.value)

 print msg

#consumer.commit();

 #print load(buf2,avro.schema)

Reply all
Reply to author
Forward
0 new messages