Hi,
I am using the Kafka Connect RabbitMQ connector. What I intend on doing is having Kafka consume from a queue in RabbitMQ.
The error message I am receiving is this:
[2018-12-18 16:52:56,127] ERROR Consumer io.confluent.connect.rabbitmq.ConnectConsumer@615683e8 (amq.ctag-OXT3fZ-b5QWdjL4L3UUAzA) method handleDelivery for channel AMQChannel(amqp://
ad...@172.31.43.51:5672/master,1) threw an exception for channel AMQChannel(amqp://
ad...@172.31.43.51:5672/master,1) (com.rabbitmq.client.impl.ForgivingExceptionHandler:124)
java.lang.NullPointerException
at io.confluent.connect.rabbitmq.MessageConverter.basicProperties(MessageConverter.java:127)
at io.confluent.connect.rabbitmq.SourceRecordBuilder.sourceRecord(SourceRecordBuilder.java:40)
at io.confluent.connect.rabbitmq.ConnectConsumer.handleDelivery(ConnectConsumer.java:69)
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
My standalone worker configuration file is this:
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=share/java,/home/ubuntu/confluent/share/confluent-hub-components
rest.port=5020
The Kafka connector to RabbitMQ works if there are no messages in RabbitMQ, but as soon as a message is passed to the RabbitMQ queue that the Kafka consumer is listening to is when the Kafka Connect RabbitMQ connector breaks and presents the above error message. So Kafka in the end connects to RabbitMQ, it just produces an error message as soon as it has a message from RabbitMQ ready to be sent back to Kafka.
I cannot figure out what is the issue, I have changed a few configuration options around like using the String and Avro converter but I keep on getting the above error.
Any help is greatly appreciated.
Thanks.