Kafka Connect RabbitMQ Consuming Errors

267 views
Skip to first unread message

Gjergj Baca

unread,
Dec 18, 2018, 11:59:34 AM12/18/18
to Confluent Platform
Hi,

I am using the Kafka Connect RabbitMQ connector. What I intend on doing is having Kafka consume from a queue in RabbitMQ.

The error message I am receiving is this:

[2018-12-18 16:52:56,127] ERROR Consumer io.confluent.connect.rabbitmq.ConnectConsumer@615683e8 (amq.ctag-OXT3fZ-b5QWdjL4L3UUAzA) method handleDelivery for channel AMQChannel(amqp://ad...@172.31.43.51:5672/master,1) threw an exception for channel AMQChannel(amqp://ad...@172.31.43.51:5672/master,1) (com.rabbitmq.client.impl.ForgivingExceptionHandler:124)
java.lang.NullPointerException
at io.confluent.connect.rabbitmq.MessageConverter.basicProperties(MessageConverter.java:127)
at io.confluent.connect.rabbitmq.SourceRecordBuilder.sourceRecord(SourceRecordBuilder.java:40)
at io.confluent.connect.rabbitmq.ConnectConsumer.handleDelivery(ConnectConsumer.java:69)
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

My standalone worker configuration file is this:

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter


key.converter.schemas.enable=false
value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets


plugin.path=share/java,/home/ubuntu/confluent/share/confluent-hub-components
rest.port=5020

The Kafka connector to RabbitMQ works if there are no messages in RabbitMQ, but as soon as a message is passed to the RabbitMQ queue that the Kafka consumer is listening to is when the Kafka Connect RabbitMQ connector breaks and presents the above error message. So Kafka in the end connects to RabbitMQ, it just produces an error message as soon as it has a message from RabbitMQ ready to be sent back to Kafka.

I cannot figure out what is the issue, I have changed a few configuration options around like using the String and Avro converter but I keep on getting the above error.

Any help is greatly appreciated.

Thanks.

Ayd Asraf

unread,
Jan 16, 2019, 8:12:28 AM1/16/19
to Confluent Platform
did you manage to find a solution? i am facing the exact same issue. :(

robeid

unread,
Feb 12, 2019, 12:29:16 PM2/12/19
to Confluent Platform
I have the same problem. 

Which JDK you are using to run Kafka connect? 

On Tuesday, December 18, 2018 at 6:59:34 PM UTC+2, Gjergj Baca wrote:
Hi,

I am using the Kafka Connect RabbitMQ connector. What I intend on doing is having Kafka consume from a queue in RabbitMQ.

The error message I am receiving is this:

[2018-12-18 16:52:56,127] ERROR Consumer io.confluent.connect.rabbitmq.ConnectConsumer@615683e8 (amq.ctag-OXT3fZ-b5QWdjL4L3UUAzA) method handleDelivery for channel AMQChannel(amqp://admin@172.31.43.51:5672/master,1) threw an exception for channel AMQChannel(amqp://admin@172.31.43.51:5672/master,1) (com.rabbitmq.client.impl.ForgivingExceptionHandler:124)
java.lang.NullPointerException
at io.confluent.connect.rabbitmq.MessageConverter.basicProperties(MessageConverter.java:127)
at io.confluent.connect.rabbitmq.SourceRecordBuilder.sourceRecord(SourceRecordBuilder.java:40)
at io.confluent.connect.rabbitmq.ConnectConsumer.handleDelivery(ConnectConsumer.java:69)
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

grzegor...@g2a.com

unread,
Apr 8, 2019, 11:38:42 AM4/8/19
to Confluent Platform
Hi,

I am struggling with the same problem and so far without success.

Have you solved the problem on your side?

On Tuesday, December 18, 2018 at 5:59:34 PM UTC+1, Gjergj Baca wrote:
Hi,

I am using the Kafka Connect RabbitMQ connector. What I intend on doing is having Kafka consume from a queue in RabbitMQ.

The error message I am receiving is this:

[2018-12-18 16:52:56,127] ERROR Consumer io.confluent.connect.rabbitmq.ConnectConsumer@615683e8 (amq.ctag-OXT3fZ-b5QWdjL4L3UUAzA) method handleDelivery for channel AMQChannel(amqp://admin@172.31.43.51:5672/master,1) threw an exception for channel AMQChannel(amqp://admin@172.31.43.51:5672/master,1) (com.rabbitmq.client.impl.ForgivingExceptionHandler:124)
java.lang.NullPointerException
at io.confluent.connect.rabbitmq.MessageConverter.basicProperties(MessageConverter.java:127)
at io.confluent.connect.rabbitmq.SourceRecordBuilder.sourceRecord(SourceRecordBuilder.java:40)
at io.confluent.connect.rabbitmq.ConnectConsumer.handleDelivery(ConnectConsumer.java:69)
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Kesavan E

unread,
Jul 18, 2019, 1:32:03 PM7/18/19
to Confluent Platform
Hi, 

I had the same issue... and it has to do with the RabbitMQ message. Messages published in RabbitMQ needs to have Basic Properties assigned, that will solve the issue. 

Let me know if you are trying to resolve.. will post details. 

Thanks,
Kesavan



On Tuesday, December 18, 2018 at 11:59:34 AM UTC-5, Gjergj Baca wrote:
Hi,

I am using the Kafka Connect RabbitMQ connector. What I intend on doing is having Kafka consume from a queue in RabbitMQ.

The error message I am receiving is this:

[2018-12-18 16:52:56,127] ERROR Consumer io.confluent.connect.rabbitmq.ConnectConsumer@615683e8 (amq.ctag-OXT3fZ-b5QWdjL4L3UUAzA) method handleDelivery for channel AMQChannel(amqp://admin@172.31.43.51:5672/master,1) threw an exception for channel AMQChannel(amqp://admin@172.31.43.51:5672/master,1) (com.rabbitmq.client.impl.ForgivingExceptionHandler:124)
java.lang.NullPointerException
at io.confluent.connect.rabbitmq.MessageConverter.basicProperties(MessageConverter.java:127)
at io.confluent.connect.rabbitmq.SourceRecordBuilder.sourceRecord(SourceRecordBuilder.java:40)
at io.confluent.connect.rabbitmq.ConnectConsumer.handleDelivery(ConnectConsumer.java:69)
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Reply all
Reply to author
Forward
0 new messages