MQTT client consumer not subscribing to RabbitMQ queue

233 views
Skip to first unread message

Bryan Forst

unread,
Jul 12, 2019, 8:17:09 AM7/12/19
to rabbitmq-users
OK, now for a my noobie question... 

I am using a Java server to publish message to a Rabbit queue like so:
-----------------------------------------------------------------------------------------
public static void SendMessage(String queue, String message){
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        log.debug("factory created");
        try (Connection connection = factory.newConnection();
              Channel channel = connection.createChannel()) {
              channel.queueDeclare(queue, false, false, false, null);
              channel.basicPublish("", queue, null, message.getBytes());
        } catch (IOException ex ){
            log.error("MessageManager.SendMessage error sending message " + ex);
        } catch (TimeoutException ex){
            log.error("MessageManager.SendMessage timeout  sending message " + ex);
        }
    }
--------------------------------------

Fine, I can see message on the Rabbit admin. 
I use  Paho eclipse on the Javascript client side to consume from the queue
----------------------------------------
function onloadFunction(){
// Create a client instance
client = new Paho.MQTT.Client("localhost", 15675, "/ws", "/:mqtt-user");

// set callback handlers
client.onConnectionLost = onConnectionLost;
client.onMessageArrived = onMessageArrived;

// connect the client
client.connect({onSuccess:onConnect});
}
function onConnect() {
  console.log("onConnect");
  client.subscribe("vom", {qos: 1});
}
// called when the client loses its connection
function onConnectionLost(responseObject) {
  if (responseObject.errorCode !== 0) {
    console.log("onConnectionLost:"+responseObject.errorMessage);
  }
}
// called when a message arrives
function onMessageArrived(message) {
  console.log("onMessageArrived:"+message.payloadString);
}
------------------------------------------------------------------------
The write to the "vom" queue on the server. 
I have tried variations per what I understand is the difference between Rabbit queues and MQTT topics (amq/topic/vom, /topic/vom, /vom, etc, etc, etc)
Nothing seems to work. It sits for a while and then eventually disconnects. 
--------------------------
onConnect
onConnectionLost:AMQJS0008I Socket closed.
-------------------------

I am sure I am doing something very simple wrong, What am I missing? 

Thanks, 
Bryan

Luke Bakken

unread,
Jul 12, 2019, 12:28:29 PM7/12/19
to rabbitmq-users
Hi Bryan,

MQTT uses a topic exchange: https://www.rabbitmq.com/mqtt.html

Your code is publishing to the default "unnamed" exchange which is a direct exchange.

So, be sure you're publishing to the correct exchange (amq.topic) and ensure the "vom" queue is bound to it with the appropriate binding information.

Thanks,
Luke

Bryan Forst

unread,
Jul 13, 2019, 2:06:59 PM7/13/19
to rabbitmq-users
Thanks Luke, 

The exchange setup is one of my worries about using MQTT. 
I will need to create many different , short-lived queues on the Rabbit server from Java 
and am not sure if this MQTT exchange and binding requirement will be manageable. 

Bryan

Bryan Forst

unread,
Jul 14, 2019, 9:48:27 AM7/14/19
to rabbitmq-users
still the same results after changing the java. Based on this item

 try (Connection connection = factory.newConnection();
              Channel channel = connection.createChannel()) {
            log.debug("queue being declared");
            channel.queueDeclare(queue, false, false, false, null);
            channel.exchangeDeclare("amq.topic", BuiltinExchangeType.DIRECT);
            channel.queueBind(queue, "amq.topic", "vom");
            log.debug("queue created");
            channel.basicPublish("amq.topic", "info", null, message.getBytes());

And my client simply changed: 
function onloadFunction(){
// Create a client instance
client = new Paho.MQTT.Client("localhost", 15675, "/ws", "/:mqtt-user");

// set callback handlers
client.onConnectionLost = onConnectionLost;
client.onMessageArrived = onMessageArrived;

// connect the client
client.connect({onSuccess:onConnect});
}
function onConnect() {
  console.log("onConnect");
  client.subscribe("vom", {qos: 1});
}
-------------------------------------------

Luke Bakken

unread,
Jul 15, 2019, 11:15:44 AM7/15/19
to rabbitmq-users
Hi Bryan,

amq.topic is a default exchange that is pre-declared by RabbitMQ so there's no need to do it yourself. Your code is trying to declare it as a direct exchange anyway.

When you publish to the amq.topic exchange the messages's routing key must match a topic that is used to bind the queue to the exchange - https://www.rabbitmq.com/tutorials/tutorial-five-java.html

So, there are a couple of issues here.

Thanks,
Luke
Reply all
Reply to author
Forward
0 new messages