I am implementing Kafka producer and consumer logic with confluent-kafka(Open Source) library in my .Net core application. I have referred this link https://github.com/confluentinc/confluent-kafka-dotnet. My kafka server is configured with ssl on cloud server but I tried with confluent-kafka namespace but I could not find how to configure ssl. Please let me know if anyone knew solution for it.
My configuration is like below,
Dictionary<string, object> config = new Dictionary<string, object>
{
{ "security.protocol", "ssl" },
{"ssl.ca.location","./trust_cert.pem"},
{"ssl.certificate.location","./client_cert.pem"},
{"ssl.key.location","./client_cert_key.pem"},
{"ssl.key.password","23432nrflskasfosdiafofaowr23r34rwf3wf4a34r4wra3fcca34" },
{ "bootstrap.servers", "12.0.0.15:9096,12.0.0.40:9096" },
{ "group.id", "NetcoreKafka" },
{ "default.topic.config", new Dictionary<string, object>
{
{ "auto.offset.reset", "smallest" }
}
}
};
I am calling kafka consumer like below,
using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8)))
{
consumer.Assign(new List<TopicPartitionOffset> { new TopicPartitionOffset(topics, 0, 0) });
while (true)
{
Message<Null, string> msg;
if (consumer.Consume(out msg))
{
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
}
}
}
I am getting this error,
Message: Attempted to read or write protected memory. This is often an indication that other memory is corrupt.
StackTrace: at Confluent.Kafka.Impl.LibRdKafka.NativeMethods.rd_kafka_conf_destroy(IntPtr conf)
at Confluent.Kafka.Impl.SafeKafkaHandle.Create(RdKafkaType type, IntPtr config)
at Confluent.Kafka.Consumer..ctor(IEnumerable`1 config)
at Confluent.Kafka.Consumer`2..ctor(IEnumerable`1 config, IDeserializer`1 keyDeserializer, IDeserializer`1 valueDeserializer)
at KafkaDemo.Program.Main(String[] args) in D:\KafkaDemo\Program.cs:line 60
Please help me to resolve this issue.
Thanks,
Shanmugam M