Subscribing to multiple topics

1,649 views
Skip to first unread message

Sy

unread,
Sep 21, 2011, 7:05:05 AM9/21/11
to MQ Telemetry Transport
I am currently writing my own implementation of the MQTT v3.1
specifcation, partly to get a better understanding of the protocol and
mainly because I've had limited success with the client source code
available on http://mqtt.org

The JAVA tool wmqttSample.jar works fine and I'm using this for
testing. However this doesn't allow you to subscribe to multiple
topics at the same time.

If I subscribe to a single topic and that topic is retained I get a
publish response for that topic which is correct.

However if I subscribe to 3 topics in a single subscription message I
get a 3 individual publish messages. The problem is they're is no
indication in the subscription response how many messages to expect,
only the length of the first message to follow.

Also a slight inconsistency in the protocol is that in most cases
strings are encoded as UTF-8 where the length is encoded as high byte
and low byte, the publish response contains the topic in this format,
however the data message is not, after the topic the data message
follows with no indication of length.

If I'm missing something here please correct me.

Thank you,
Simon

Roger Light

unread,
Sep 21, 2011, 7:43:51 AM9/21/11
to mq...@googlegroups.com
Hi Simon,

> I've had limited success with the client source code
> available on http://mqtt.org

Did you try the mosquitto clients/client library? If you did I'd be
very interested in hearing any problems you had with them. They don't
support subscribing to multiple topics at the same time for sure!

> However if I subscribe to 3 topics in a single subscription message I
> get a 3 individual publish messages.  The problem is they're is no
> indication in the subscription response how many messages to expect,
> only the length of the first message to follow.

I'm not entirely sure what you mean here. The subscription response
(SUBACK) should contain a payload that indicates the granted
subscription level of each of your subscription topics. There is no
way of telling you what PUBLISH messages you should expect to get back
- if there is a retained message it will be sent and if new messages
appear that match your subscription they will be sent as well.

> Also a slight inconsistency in the protocol is that in most cases
> strings are encoded as UTF-8 where the length is encoded as high byte
> and low byte, the publish response contains the topic in this format,
> however the data message is not, after the topic the data message
> follows with no indication of length.

When you receive a PUBLISH, you can determine the length of the data
message (the payload) by subtracting the length of the PUBLISH
variable header from the "remaining length". Does that answer your
question?

Cheers,

Roger

Sy

unread,
Sep 21, 2011, 8:17:35 AM9/21/11
to MQ Telemetry Transport
Hi Roger,

According to page 29 of the MQTT v3.1 Protocol Specification:

"A server may start sending PUBLISH messages due to the subscription
before the client receives the SUBACK message."

As a test I am publishing 3 topics with the retain flag set, I then
subscribe to all three messages in a single call. I then get back,
three seperate PUBLISH messages and then finally a SUBACK, in that
order and its very repeatable and reliable.

I will publish my decode for this when its complete, its getting
there.

Regards,
Simon

On Sep 21, 12:43 pm, Roger Light <ro...@atchoo.org> wrote:
> Hi Simon,
>
> > I've had limited success with the client source code
> > available onhttp://mqtt.org

Roger Light

unread,
Sep 21, 2011, 8:25:34 AM9/21/11
to mq...@googlegroups.com
Hi Simon,

> According to page 29 of the MQTT v3.1 Protocol Specification:
>
> "A server may start sending PUBLISH messages due to the subscription
> before the client receives the SUBACK message."
>
> As a test I am publishing 3 topics with the retain flag set, I then
> subscribe to all three messages in a single call.  I then get back,
> three seperate PUBLISH messages and then finally a SUBACK, in that
> order and its very repeatable and reliable.

That seems what I'd expect and consistent with the spec to me. Can you
explain the problem?

Thanks,

Roger

Simon Platten

unread,
Sep 21, 2011, 8:47:29 AM9/21/11
to mq...@googlegroups.com
Not so much of a problem but something that could be improved upon.  In most instances there is a response to each request and the protocol is pretty good at specifying what and how long to expect.  But in this instance you don't know how many responses to expect, so you have to keep reading the socket until you get nothing back.

--
To learn more about MQTT please visit http://mqtt.org

To post to this group, send email to mq...@googlegroups.com
To unsubscribe from this group, send email to
mqtt+uns...@googlegroups.com

For more options, visit this group at
http://groups.google.com/group/mqtt



--
Regards,
Sy

Nicholas O'Leary

unread,
Sep 21, 2011, 9:01:03 AM9/21/11
to mq...@googlegroups.com
Hi Simon,

> Also a slight inconsistency in the protocol is that in most cases
> strings are encoded as UTF-8 where the length is encoded as high byte
> and low byte, the publish response contains the topic in this format,
> however the data message is not, after the topic the data message
> follows with no indication of length.

The protocol makes no statement about what the payload contains. It is
simply an array of bytes that contains whatever data the publisher has
put in there.
Hence why there is no UTF-8 length header there. As Roger says,
determining the length of the payload is a simple task given the
remaining length field.

> Not so much of a problem but something that could be improved upon.
> In most instances there is a response to each request and the protocol is
> pretty good at specifying what and how long to expect. But in this instance
> you don't know how many responses to expect, so you have to keep reading
> the socket until you get nothing back.

The Publishes you receive as a result of a Subscribe flow are not
considered responses to the Subscribe. They are new flows in their own
right - with their own unique message ID (assuming QoS >0) - that
happen to interleave with the Sub-Suback flow.

Once you subscribe, you need to be watching for new data from the
socket anyway, so this should not be a problem.

The main point is that a client should make sure its Publish handling
is in place before it sends the Subscribe and not assume it will
receive the SubAck before any publishes will begin to arrive.

Regards,
Nick

Sy

unread,
Sep 21, 2011, 10:31:05 AM9/21/11
to MQ Telemetry Transport
Hi Nick,

Thank for the reply, I've now coded for this scenario and it seems to
work well.

[code]
do {
// Look for the fixed header
$strData = $this->read( 5 );

if ( strlen($strData) < 2 ) {
// Nothing to read, exit
break;
}
$intOffset = 0;
$intMsgType = ord($strData[0]) >> 0x04;

if ( $intMsgType == MQTT_PUBLISH ) {
$intRemaining = $this->decodeRemaining( $strData, 1,
$intOffset );

if ( $intOffset > 0 ) {
$strData = substr($strData, $intOffset);
} else {
$strData = "";
}
// Look for the remaining data
$intOffset = strlen($strData);
$strData .= $this->read( $intRemaining - $intOffset );
$intMsgLength = strlen($strData);

if ( $intMsgLength != $intRemaining ) {
$this->errorMsg( "Expected {$intRemaining} bytes, " .
"received: {$intMsgLength} bytes<br /
>" .
$this->dump($strData) );
break;
}
// Extract the topic from the message
$strTopic = $strMsg = "";
$intLength = (ord($strData[0]) << 0x08) | ord($strData[1]);

// Add 2 for length bytes (hi & lo byte)
for( $j=2, $i=0; $i<$intLength; $i++, $j++ ) {
$strTopic .= $strData[$j];
}
// How much data is left in the packet after the topic?
$intRemaining -= ($intLength + 2);

while( $intRemaining-- ) {
$strMsg .= $strData[$j];
$j++;
}
if ( isset($this->m_arySubs[$strTopic]) ) {
$this->m_arySubs[$strTopic]['msg'] = $strMsg;
}
} else if ( $intMsgType == MQTT_SUBACK ) {
// Nothing to do here
break;
} else {
$this->errorMsg( "Subscription failed:" .
$this->dump($strData) );
return false;
}
} while ( true );
[/code]

Its written in PHP, not exhaustive and still a work in progress, when
I've implemented the entire class I will publish.
Reply all
Reply to author
Forward
0 new messages