Support Backpressure option for MQTT inputs

502 views
Skip to first unread message

Sajay Antony

unread,
Apr 14, 2016, 5:49:25 AM4/14/16
to Node-RED
MQTT supports backpressure through the `mqtt.Client#handleMessage(packet, callback)`. The default MQTT input doesn't provide this capability in anyway and hence can cause a flood of messages when starting up and if we use durable/persistent clients with a large number of outstanding messages. A potential would be to include a flag in the configuration that would tell the broker to use handleMessage instead of subscribe and the completion can be handled by the flow.


Does this sound reasonable?

Nicholas O'Leary

unread,
Apr 14, 2016, 10:59:15 AM4/14/16
to Node-RED Mailing List
The main issue with this approach is that you are adding a javascript function to the msg object as msg.mqttHandleMessageComplete - whilst it technically works, it is not node-red practice to do this. Messages should be JSON encodable/decodable. (We know we still have some issues with the HTTP nodes that break this, but we're slowing sorting that out).

It also puts the burden on the flow author to know they have to call this function at some point in their flow. If the flow completes and they never call that function it will block future messages for ever more.

If you're concerned with floods of messages, you could use a delay node to rate limit the flow processing after the MQTT node; not completely ideal, but workable.

Nick


--
http://nodered.org
 
Join us on Slack to continue the conversation: http://nodered.org/slack
---
You received this message because you are subscribed to the Google Groups "Node-RED" group.
To unsubscribe from this group and stop receiving emails from it, send an email to node-red+u...@googlegroups.com.
To post to this group, send email to node...@googlegroups.com.
Visit this group at https://groups.google.com/group/node-red.
For more options, visit https://groups.google.com/d/optout.

Sajay Antony

unread,
Apr 14, 2016, 12:33:54 PM4/14/16
to node...@googlegroups.com
Thanks for the response :)

I was assuming that the msg object can hold state, similar to the http reponse node. But if that is not a desired practice then I understand the concern.

Delay still doesn't solve the problem since the intent is to stop input completely and not consume messages that cannot be handled unless an exception path or something resume's processing i.e. flow control . Rate limiter still needs to have an Aquire/Release semantic and has to be inherent to the input and the same argument could be used that someone might forget to release the throttle but I guess if we want to allow proper rate control this is necessary.  Are you thinking that the flow/runtime can handle the throttle and on completion of the flow release if the throttling is enabled?
Can a node subscribe for an instance completion event?

I'm curious what the thinking is regarding the proper way to do this since backpressure/throttling of flows seems like right thing to do out of the box.

You received this message because you are subscribed to a topic in the Google Groups "Node-RED" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/node-red/rV-4jrGFAd0/unsubscribe.
To unsubscribe from this group and all its topics, send an email to node-red+u...@googlegroups.com.

Dave C-J

unread,
Apr 14, 2016, 2:34:13 PM4/14/16
to node...@googlegroups.com
well I guess it also depends what the real use case is. On reconnection do you actually want all those messages ? I guess you do or you would use a cleansession connection to the broker. Or maybe you can filter out many of the arriving messages to create a more manageable rate - drop 3 out of 4 for example ? Or you could create a local queue - this example does it in memory - http://flows.nodered.org/flow/cea8afa28b7a93ebdc0f - but of course that may have limits also.

Dave C-J

unread,
Apr 14, 2016, 5:37:44 PM4/14/16
to node...@googlegroups.com
Sajay,
I see you have gone ahead and created a fork of the MQTT node anyway. Please note that as Nick said this may/will break in the future if the underlying transport from node to node does not cope with stringifying attached functions.

Sajay Antony

unread,
Apr 20, 2016, 2:40:03 AM4/20/16
to node...@googlegroups.com
>>On reconnection do you actually want all those messages. Yes. 
For. e.g. 
schedule batchJob1
schedule batchJob2
 - You don't want to pick up batchJob2 until you have completed 1. 

The motivation is to have a set of machines capable of remote job scheduling - https://github.com/SajayAntony/cmdport 

BTW.. this whole thing and you folks are awesome. I agree it might break in the future but at this point its just a prototype to build something out without without having to rewrite the entire MQTT Node semantics. 
I might be missing a lot of information here like 


1. how to configure the broker dynamically from settings 
2. Configure credentials dynamically through config/variable binding. 
3. Compose synchoronous and async model of message dispatching properly. 



On Thu, Apr 14, 2016 at 2:37 PM, Dave C-J <dce...@gmail.com> wrote:
Sajay,
I see you have gone ahead and created a fork of the MQTT node anyway. Please note that as Nick said this may/will break in the future if the underlying transport from node to node does not cope with stringifying attached functions.

--

Dave C-J

unread,
Apr 20, 2016, 3:42:32 AM4/20/16
to node...@googlegroups.com
Hi,

most fields (apart from ones expecting numbers etc) can use a syntax like
    $(env_var)
where env_var is an external system environment variable. So you could set a variable eg
export BROKER=192.168.2.100
then internally use $(BROKER)

Dave C-J

unread,
Apr 20, 2016, 3:43:19 AM4/20/16
to node...@googlegroups.com
PS - please change the license on your node back to Apache-v2. You cannot just copy 99% of the code and re-license it.

Sajay Antony

unread,
Apr 20, 2016, 4:01:37 AM4/20/16
to node...@googlegroups.com
License fixed - Apologize for the incorrect license.

Is there any other attribution that I need to do like point to the original source?

If variable syntax works then I guess I could set the environment variables on process.env.BROKER  = "mybroker" and reuse that as $(BROKER) Is this documented anywhere. 
Also do credentials also work the same way?

On Wed, Apr 20, 2016 at 12:43 AM, Dave C-J <dce...@gmail.com> wrote:
PS - please change the license on your node back to Apache-v2. You cannot just copy 99% of the code and re-license it.

--

Sajay Antony

unread,
Apr 20, 2016, 4:19:59 AM4/20/16
to node...@googlegroups.com

Dave C-J

unread,
Apr 20, 2016, 5:01:43 AM4/20/16
to node...@googlegroups.com
Thanks for fixing that. No, I think the rest is good as-is.
And yes - process.env.BROKER will work.. - but only at start time when the config is read...
(eg you can also configure things in settings.js in the same way - but using process.env. in there to read external variables.)
I don't believe credentials do the same thing - but maybe they should - but not sure setting credentials so they are totally visible to the rest of the system may not be so good.

Sajay Antony

unread,
Apr 21, 2016, 3:30:31 AM4/21/16
to Node-RED
Looks like credentials are being deleted from the clone and I dont think they are setup from env vars. I was hoping to remove all the goo in my fork which takes values from settings and just resort to env vars.
Reply all
Reply to author
Forward
0 new messages