Confluent.Kafka With SSL

767 views
Skip to first unread message

shanmugam mahalingam

unread,
Mar 15, 2017, 2:54:34 AM3/15/17
to Confluent Platform
Hi All,

I am implementing Kafka producer and consumer logic with confluent-kafka(Open Source) library in my .Net core application. I have referred this link https://github.com/confluentinc/confluent-kafka-dotnet. My kafka server is configured with ssl on cloud server but I tried with confluent-kafka namespace but I could not find how to configure ssl. Please let me know if anyone knew solution for it.


My configuration is like below,

Dictionary<string, object> config = new Dictionary<string, object>

            {

                { "security.protocol", "ssl"  },

                {"ssl.ca.location","./trust_cert.pem"},

                {"ssl.certificate.location","./client_cert.pem"},

                {"ssl.key.location","./client_cert_key.pem"},

                {"ssl.key.password","23432nrflskasfosdiafofaowr23r34rwf3wf4a34r4wra3fcca34" },

                { "bootstrap.servers", "12.0.0.15:9096,12.0.0.40:9096" },

                { "group.id", "NetcoreKafka" },

                { "default.topic.config", new Dictionary<string, object>

                    {

                        { "auto.offset.reset", "smallest" }

                    }

                }

            }; 


I am calling kafka consumer like below,

using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8)))

                {

                    consumer.Assign(new List<TopicPartitionOffset> { new TopicPartitionOffset(topics, 0, 0) });


                    while (true)

                    {

                        Message<Null, string> msg;

                        if (consumer.Consume(out msg))

                        {

                            Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");

                        }

                    }

                }


I am getting this error,

Message: Attempted to read or write protected memory. This is often an indication that other memory is corrupt.

StackTrace:  at Confluent.Kafka.Impl.LibRdKafka.NativeMethods.rd_kafka_conf_destroy(IntPtr conf)

   at Confluent.Kafka.Impl.SafeKafkaHandle.Create(RdKafkaType type, IntPtr config)

   at Confluent.Kafka.Consumer..ctor(IEnumerable`1 config)

   at Confluent.Kafka.Consumer`2..ctor(IEnumerable`1 config, IDeserializer`1 keyDeserializer, IDeserializer`1 valueDeserializer)

   at KafkaDemo.Program.Main(String[] args) in D:\KafkaDemo\Program.cs:line 60


Please help me to resolve this issue.


Thanks,

Shanmugam M


Reply all
Reply to author
Forward
0 new messages