MQTT and Phoenix Channels

932 views
Skip to first unread message

tra...@gmail.com

unread,
Sep 21, 2015, 3:08:24 PM9/21/15
to phoenix-talk
Hi everyone. I am new to Phoenix, Elixir and Erlang, but I've got a real-time communication problem to solve and I want to build a solution with Phoenix. In broad terms, I need to build a server where sensors can publish data and subscribers can receive that data in real time. Sensors will publish using MQTT and HTTP, while clients subscribe with Websockets. The server should also present a web interface to clients, where they can pick which sensors they want to subscribe to. Seems like a perfect match for Phoenix.

The first issue here is adding MQTT functionality to Phoenix. I've seen it mentioned a couple of times that it is definately possible, but I'm not really sure how to approach it. Here's what I'm thinking: 

VerneMQ is a MQTT broker written in Erlang. It is built as a set of OTP applications, and runs on top of Ranch and Cowboy. I suppose it's possible to run this in the same VM as Phoenix - but are there any common issues I need to be aware of?   

Next problem is getting the MQTT messages from VerneMQ into Phoenix. I'm lost here. VerneMQ supports plugins, so I could write a plugin that calls into some Elixir code. But I'm not sure it's a good idea. I'd prefer the same relationship between Phoenix and VerneMQ as between Phoenix and Cowboy. But I don't know how that works. I guess Cowboy receives HTTP requests and then pass them on by calling a callback registered in a Phoenix Endpoint?

I have plenty of questions regarding what happens once the message gets to a Phoenix endpoint, but I think I'll leave those for later. I'm sorry about the vague questions! I'm very excited about Phoenix and Elixir - but also kinda confused. Can anyone point me in the right direction?

Best regards
Troels

Preston Marshall

unread,
Sep 21, 2015, 4:31:12 PM9/21/15
to phoeni...@googlegroups.com
I've used VerneMQ with Phoenix, so I can help you a bit. The way I did it is to install VerneMQ separately, and used an Elixir MQTT client to subscribe to queues. You could probably deploy it all as one thing, but I didn't ever attempt to do that. For a large scale production you'd probably want your cluster of MQTT servers running separately from your phoenix apps anyways. Once you have subscribed to a queue, you can just Endpoint.broadcast! the messages out to channels, and do what you want from there. I ended up modeling my actual devices as GenServers and adding them to a supervisor tree. The servers receive messages from the device and can change state accordingly. You can also call functions on the servers (GenServer.call) to send messages back down to the devices. Right now this is part of my Phoenix application, but I can see it moving to its own app very soon.

Let me know if you have any more questions.

Best,
Preston

--
You received this message because you are subscribed to the Google Groups "phoenix-talk" group.
To unsubscribe from this group and stop receiving emails from it, send an email to phoenix-talk...@googlegroups.com.
To post to this group, send email to phoeni...@googlegroups.com.
Visit this group at http://groups.google.com/group/phoenix-talk.
To view this discussion on the web visit https://groups.google.com/d/msgid/phoenix-talk/80738a03-97ff-49c2-8df9-cbeff0c3571e%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

tra...@gmail.com

unread,
Sep 22, 2015, 10:28:13 AM9/22/15
to phoenix-talk, bbh...@bbhoss.io
Hi Preston, thanks for your quick reply.

I suppose I might be taking the wrong approach. Your way of doing it is definitely also an option. I was simply inspired by how Channels are transport agnostic - and I've heard it specifically mentioned at a few talks that Phoenix should be able to talk MQTT. And since the only purpose of this cluster is to receive sensor data and publish it to subscribers (and present a web interface for managing the cluster and connected sensors), I thought that putting it all in one app could make sense.

But I might end up doing it like you - it seems more approachable at this point. Did you write your own MQTT client or did you use an existing one?

Also: MQTT can have nested topics like /sensors/uuid/battery while Phoenix apparently only supports topic:subtopic. Did you do anything special about that?

Best regards,
Troels

Chris McCord

unread,
Sep 22, 2015, 10:33:48 AM9/22/15
to phoeni...@googlegroups.com
Preston’s approach is definitely a good one and a viable option for you. But as you said, you could also go the transport route. I would love to see an MQTT transport written for Phoenix if you’re up for the task. If you check out the WebSocket transport, you should get an idea of what it takes (which isn’t much). Also, to be clear topics are arbitrary strings, but our convention is “topic:subtopic”. Nothing is stopping you from doing “sensors:1234:battery”/ Hope that helps!




Josh Adams

unread,
Sep 22, 2015, 10:45:47 AM9/22/15
to phoeni...@googlegroups.com
For what it's worth, the websocket transport and the long poll transport are both between 120 and 200 lines of code.  Was looking at this yesterday when considering adding a transport, though I think in our case we might just use the websocket transport and call it day.


For more options, visit https://groups.google.com/d/optout.



--
[mail]
3800 Colonnade Pkwy
Suite 140
Birmingham, AL 35242

Preston Marshall

unread,
Sep 22, 2015, 3:05:32 PM9/22/15
to phoenix-talk
So, there exists this: https://github.com/larshesel/phoenix_pubsub_vernemq but it seems that this is just a backend for channels, not a true integration. It ALMOST works, but there is an impedance mismatch between MQTT queues (and the various ways to bind to them) and channels. For it to be truly integrated, I think this might need to be resolved. Otherwise, we're stuck with just the simple wildcard binding. In addition to that, that specific library requires everything coming from MQTT to be an Erlang term that translates into a %Phoenix.Socket.Message{}. I think a stab at making a transport would be really cool though, please keep us updated if you do that, as I think it would be very useful to the IoT Phoenix community.

Best,
Preston

tra...@gmail.com

unread,
Sep 22, 2015, 4:17:56 PM9/22/15
to phoenix-talk
Yes, I found phoenix_pubsub_vernemq yesterday as well, but as you noted it's meant as an alternative to pg2 and Redis for Pubsub.

Preston, could you elaborate on the mismatch between MQTT queues and channels?

Also, thanks for clearing up my confusion around Channel topics. I also thought it was a pretty arbitrary restriction, but assumed it could only be topic:subtopic since that was the only examples I've found.

I would love to write an MQTT transport for Phoenix, but I'm very new to Elixir. I've looked a bit at the code for channels, transports, sockets and endpoints, trying to figure out how it all fits together. Still kind of stuck. I guess my next step is to try and do a mind-map of how everything is related. Then once I've figured out where I need to start, I'll be back with more questions :)

Best regards,
Troels

Preston Marshall

unread,
Sep 22, 2015, 10:55:17 PM9/22/15
to phoeni...@googlegroups.com
With MQTT, you can do things like this:
Publish to: 

"airdata/austin/co2"
"airdata/austin/co"
"airdata/dallas/co"
"airdata/dallas/co2"
"airdata/dallas/so2"

And scoop all of the co2 data up for all cities with a simple subscription to "airdata/+/co2". Also, instead of using * for a wildcard, they use #, and it matches that "level" in the topic, instead of just being a simple string wildcard like Phoenix. Finally MQTT has a lot of different tunables surrouding QoS, which I don't think we can easily support right now (I could be wrong). Basically Phoenix is very simple to keep it as flexible as possible, but this also is currently holding us back from using channels with MQTT to their full potential. I think some serious thought should go into how we want to handle this, as IoT looks like a great application for Elixir, Phoenix, and friends. Let me know if that makes sense.

Thanks,
Preston



Chris McCord

unread,
Sep 22, 2015, 11:03:04 PM9/22/15
to phoeni...@googlegroups.com
MQTT’s QoS is very interesting and something we can explore. 1.0 was about getting a solid and stable base that we can grow from, which I think we’ve done very well. From here, I’ll continue to practice what we’ve done all along, and that’s on-demand-driven-development. I agree web of things is an area that Phoenix is extremely well suited for, but we need clear usecases and features to drive development of things like QoS or pubsub with wildcards. So far no clear problems have been presented in these areas, but I encourage folks to use this mailing list to talk about these kinds of things and discuss feature requests


On Sep 22, 2015, at 10:55 PM, Preston Marshall <bbh...@bbhoss.io> wrote:

QoS

tra...@gmail.com

unread,
Sep 23, 2015, 3:27:08 AM9/23/15
to phoenix-talk, bbh...@bbhoss.io
Yeah, implementing MQTT subscription semantics correctly might require changes at the Channel and/or Pubsub layer. Listing the differences (see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718106 for details):

Phoenix '*' and MQTT '#' are almost the same, except # is only a wildcard when it is directly preceded by a topic-level separator ('/'). Thus 'airdata/dallas/#' will match any topics starting with 'airdata/dallas/', such as 'airdata/dallas/co2' and 'airdata/dallas/co2/alerts'. But 'airdata/dallas/co#' will only match 'airdata/dallas/co#', not 'airdata/dallas/co2' or anything else. 

In MQTT, '+' also acts as a wildcard, but only matches one topic level, as per Prestons example. As with '#', it is only treated as a wildcard when directly preceded by '/'.

I guess the big problem here is that MQTT has semantics about topic levels built into the spec, while these rely on convention in Phoenix, and thus we can't use topic level separators to specify special case behavior for certain characters. 

I wonder how this is done in other protocols. AMQP seems to have similar semantics for topic exchanges (http://spring.io/blog/2010/06/14/understanding-amqp-the-protocol-used-by-rabbitmq/), except they use '*' in place of MQTT's '+'. 

In regards to QoS, I think this could be handled at the transport level, as they only concern delivery of messages to and from the server, not between clients. For example, if a client sends a QoS 1 PUBLISH, the server responds with a PUBACK immediately after receiving it - irrespective of whether it has been sent to subscribers or not. The same is true for subscriptions: the QoS level is only used to indicate what package exchange indicates succesful delivery from server to client. So I don't think implementing QoS on the MQTT side of things will have any impact on non-MQTT message exchange. 

Best regards,
Troels

Lars Hesel Christensen

unread,
Sep 23, 2015, 3:48:08 AM9/23/15
to phoeni...@googlegroups.com
Hi

As mentioned, phoenix_pubsub_vernemq is exactly just like the other
adapters existing for Redis, pg2 etc.

Initially I also wanted to do a real integration, which could be
achieved by adding a configurable mapping-layer between MQTT and
channels. As the channel messages (%Phoenix.Socket.Message{}) also have
an `event` field, there's no clear way of mapping them - so that logic
would have to be made configurable for it to be usable.

In the end I decided that other solutions would probably be better for
integrating MQTT and channels. Something like building a MQTT transport
or, as mentioned earlier, use a MQTT client and manually handle the
mapping between channels and MQTT.

I might, of course, have missed something when doing the
phoenix_pubsub_vernemq adapter, and it's been a while since I've given
it any attention, so any suggestions or ideas are most welcome.

Cheers,
Lars
>> <http://groups.google.com/group/phoenix-talk>.
>> <https://groups.google.com/d/msgid/phoenix-talk/80738a03-97ff-49c2-8df9-cbeff0c3571e%40googlegroups.com?utm_medium=email&utm_source=footer>.
>> For more options, visit
>> https://groups.google.com/d/optout
>> <https://groups.google.com/d/optout>.
>>
>>
>>
>> --
>> You received this message because you are subscribed to
>> the Google Groups "phoenix-talk" group.
>> To unsubscribe from this group and stop receiving emails
>> from it, send an email to phoenix-talk...@googlegroups.com.
>> To post to this group, send email to
>> phoeni...@googlegroups.com.
>> Visit this group at
>> http://groups.google.com/group/phoenix-talk
>> <http://groups.google.com/group/phoenix-talk>.
>> To view this discussion on the web visit
>> https://groups.google.com/d/msgid/phoenix-talk/dc36010b-6ab7-4bca-bdc4-c8d2c76a0bf6%40googlegroups.com
>> <https://groups.google.com/d/msgid/phoenix-talk/dc36010b-6ab7-4bca-bdc4-c8d2c76a0bf6%40googlegroups.com?utm_medium=email&utm_source=footer>.
>> For more options, visit https://groups.google.com/d/optout
>> <https://groups.google.com/d/optout>.
>
> --
> You received this message because you are subscribed to the
> Google Groups "phoenix-talk" group.
> To unsubscribe from this group and stop receiving emails
> from it, send an email to phoenix-talk...@googlegroups.com.
> To post to this group, send email to phoeni...@googlegroups.com.
> Visit this group at
> http://groups.google.com/group/phoenix-talk
> <http://groups.google.com/group/phoenix-talk>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/phoenix-talk/C3B0D579-328D-4422-AE07-39496E5B71BF%40chrismccord.com
> <https://groups.google.com/d/msgid/phoenix-talk/C3B0D579-328D-4422-AE07-39496E5B71BF%40chrismccord.com?utm_medium=email&utm_source=footer>.
>
> For more options, visit https://groups.google.com/d/optout
> <https://groups.google.com/d/optout>.
>
>
>
>
> --
> *Josh Adams*
> CTO | isotope|eleven <http://www.isotope11.com>
> [cell] (205) 215-3957
> [work] (877) 476-8671 x201
>
> [mail]
> 3800 Colonnade Pkwy
> Suite 140
> Birmingham, AL 35242
>
> --
> You received this message because you are subscribed to the Google
> Groups "phoenix-talk" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to phoenix-talk...@googlegroups.com
> <mailto:phoenix-talk...@googlegroups.com>.
> To post to this group, send email to phoeni...@googlegroups.com
> <mailto:phoeni...@googlegroups.com>.
> Visit this group at http://groups.google.com/group/phoenix-talk.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/phoenix-talk/f46b6a4b-5545-4184-a0de-2bc7d4a42f8b%40googlegroups.com
> <https://groups.google.com/d/msgid/phoenix-talk/f46b6a4b-5545-4184-a0de-2bc7d4a42f8b%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc

tra...@gmail.com

unread,
Sep 23, 2015, 10:33:09 AM9/23/15
to phoenix-talk
Thanks for chiming in, Lars!

So, I've been looking a bit at Sockets and Endpoints, trying to figure out how requests are routed to a Transport. Here's what I've got so far. Please correct any mistakes.

When the application is started, Phoenix.Endpoint.Adapter.start_link/2 is called (because any Endpoints have `use Phoenix.Endpoint` in them). Adapter.start_link/2 then finds all children which are part of the application, and starts them. One of those children is Phoenix.Endpoint.Server, thus Phoenix.Endpoint.Server.start_link/2 is called. Server.start_link/2 then calls Phoenix.Endpoint.CowboyHandler.start_link/2, which starts Cowboy with `apply`. 

When CowboyHandler is started, it is configured to dispatch incoming requests to all Sockets in the Endpoint whose Transport have a :cowboy key in the handlers() map. The actual dispatch is done inside Cowboy.

Now, I'm wondering what to do with a Transport that doesn't get incoming requests through Cowboy, for example VerneMQ. Endpoints seem very HTTP-oriented. But in Phoenix.Endpoint.Server, it checks if the Endpoint's config contains :http and/or :https. Let's say I added a :mqtt key to an Endpoints config. Could Phoenix.Endpoint.Server check if the Endpoint config contains :mqtt, and then start an MqttHandler? The MqttHandler would then iterate through the Sockets defined in the Endpoint, looking for those whose Transport have :vernemq in map returned by handlers/0?

Best regards,
Troels

Chris McCord

unread,
Sep 23, 2015, 2:38:39 PM9/23/15
to phoeni...@googlegroups.com
Your understanding of the Endpoint supervision tree is correct, but the way you would wire up an mqtt transport would be decoupled form the Endpoint entirely. Since the endpoint only deals with http traffic, any non-http transport will need to be running under its own server and relaying requests through the `Phoenix.Transport` layer. So start by looking at the `Phoenix.Transports.WebSocket` implementation, but keep in mind you’ll also need to take care of the way the traffic finds its way into your transport. With HTTP, the WS/LP transports get that for free from the endpoint, but something outside of the endpoint flow will need to broker that traffic itself. Make sense?

--
You received this message because you are subscribed to the Google Groups "phoenix-talk" group.
To unsubscribe from this group and stop receiving emails from it, send an email to phoenix-talk...@googlegroups.com.
To post to this group, send email to phoeni...@googlegroups.com.

tra...@gmail.com

unread,
Sep 23, 2015, 4:31:31 PM9/23/15
to phoenix-talk
Yes, that makes sense. It also opens the door to a lot of design decisions. Here's what I'm thinking: I will create a new kind of endpoint for MQTT traffic (let's call it MqttEndpoint), which will be started in the application callback module like HTTP endpoints. In an MqttEndpoint, I would set up Sockets with Transports and Channels - possibly with a slightly different syntax as the `path` argument doesn't really make sense for an MQTT connection. The MqttEndpoint will then be responsible for starting the actual broker, and setting up callbacks so that messages will flow between the broker and the Transports. Does that make sense?

Just to make sure: if the HTTP endpoint and the MqttEndpoint has identical Pubsub config, messages broadcasted from one will reach the other endpoint as well. Right?

Best regards,
Troels

Chris McCord

unread,
Sep 23, 2015, 4:46:28 PM9/23/15
to phoeni...@googlegroups.com
That’s correct. Saying “entirely decoupled form the endpoint” was wrong on my part. You’ll still want your mqtt transport to accept an endpoint option, as well as a UserSocket handler. Then the entry from mqtt into the Transport layer is much like the WebSocket init here:


You’ll call into Transport.connect with the endpoint, user socket handler, etc, and then your mqtt transport handles the rest of the messages in and out just like any http based transport.

tra...@gmail.com

unread,
Sep 24, 2015, 5:42:07 AM9/24/15
to phoenix-talk
OK. Just to be sure, the endpoint and handler are both module names, not pids. Right?

Ed W

unread,
Sep 24, 2015, 6:52:03 AM9/24/15
to phoeni...@googlegroups.com
Hi

Might not be helpful, but mosquitto and a couple of other brokers have a
"websockets" implementation
http://jpmens.net/2014/07/03/the-mosquitto-mqtt-broker-gets-websockets-support/

Whilst I realise the point of hooking up phoenix channels, it may well
be useful to keep the approximate API the same with the various
websockets client libraries?

Very interested to use this if you create something - good luck!

Ed W
Reply all
Reply to author
Forward
0 new messages