MQTT client fails to reconnect

810 views
Skip to first unread message

Luca Masini

unread,
Feb 2, 2021, 2:29:25 AM2/2/21
to Quarkus Development mailing list

Hi guys, may be I'm on the wrong list and in case I'll switch, but using SmallRye only with Quarkus I consider it an integrating part of the ecosystem.

I'm porting a Spring Boot application to Quarkus and making my tests I noticed that stop/starting Mosquitto doesn't trigger a reconnection of the MQTT Client.

I configured the properties of the SmallRye MQTT Connector but nothing happens, then I noticed this:


and I added my comments.

I really need a workaround or better an hint on how to contribute to SmallRye to solve this problem, the Quarkus server is deployed on an edge server without a container orchestrator and must be manually restarted in case of failure, so having an health check is not a solution.

Thanks.

clement escoffier

unread,
Feb 2, 2021, 8:10:08 AM2/2/21
to Luca Masini, Quarkus Development mailing list
Hello,

Unfortunately, the bug (or missing feature) is not in Quarkus (not in SmallRye reactive messaging), but in Vert.x (closed as not a bug, just a missing feature - https://github.com/vert-x3/vertx-mqtt/issues/66). 
We could try to implement some kind of retry in the connector, but that's not going to be that simple. Maybe adding some kind of reconnection logic in the exception handler (only if we can be sure that the connection is closed at that time) would be ok:

Clement


--
You received this message because you are subscribed to the Google Groups "Quarkus Development mailing list" group.
To unsubscribe from this group and stop receiving emails from it, send an email to quarkus-dev...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/quarkus-dev/63451f91-e857-419c-8dba-f749ab5cc4e3n%40googlegroups.com.

Luca Masini

unread,
Feb 2, 2021, 8:14:05 AM2/2/21
to clement escoffier, Quarkus Development mailing list
Hi Clement, thank you for your reply.

I would like to try to handle the Exception, if possible can you give me some hints to start doing that, in case you think that can be of general interest I would be happy to share my work with you.
--
****************************************
http://www.lucamasini.net
http://twitter.com/lmasini
http://www.linkedin.com/pub/luca-masini/7/10/2b9
****************************************

clement escoffier

unread,
Feb 2, 2021, 10:00:18 AM2/2/21
to Luca Masini, Quarkus Development mailing list
Sure,

Open a PR against smallrye reactive messaging.
I believe that changing the memorize to release the connection after a disconnection, and a flag tracking the disconnection (set in the exceptionHandler) should be a good first step. 

Clement

Luca Masini

unread,
Feb 2, 2021, 10:28:27 AM2/2/21
to clement escoffier, Quarkus Development mailing list
Sure, I'll ask about best practices on SmallRye, who better than you ?

clement escoffier

unread,
Feb 2, 2021, 10:32:43 AM2/2/21
to Luca Masini, Quarkus Development mailing list
I'm maintaining the smallrye reactive messaging project, so I should be able to help you there.
I can introduce you to my rabbit, but not sure it will help :-) 
Ken or Roberto should be able to help.

Clement

Luca Masini

unread,
Feb 9, 2021, 11:40:13 AM2/9/21
to clement escoffier, Quarkus Development mailing list
Hi Clement, a question for you (or your rabbit !!).

I was able to add the reconnection to the MqttSink, was really easy once I understood the structure of the code, a Vertx's Timer in MqttConnector that checks Clients and in case clear the client reference.

What is really hard is on the MqttSource, there a Reactive Operator is returned to the caller and from there on I don't know how to recreate the underlying MqttClient.

Any suggestions?

clement escoffier

unread,
Feb 10, 2021, 5:14:38 AM2/10/21
to Luca Masini, Quarkus Development mailing list
Le mar. 9 févr. 2021 à 17:40, Luca Masini <luca....@gmail.com> a écrit :
Hi Clement, a question for you (or your rabbit !!).

I was able to add the reconnection to the MqttSink, was really easy once I understood the structure of the code, a Vertx's Timer in MqttConnector that checks Clients and in case clear the client reference.

What is really hard is on the MqttSource, there a Reactive Operator is returned to the caller and from there on I don't know how to recreate the underlying MqttClient.

That's the problem, you can't. The best you can do maybe is "retry" (onFailure().retry()) which would re-subscribe and if the client was released, would recreate one).

Clement

Luca Masini

unread,
Feb 16, 2021, 10:23:54 AM2/16/21
to clement escoffier, Quarkus Development mailing list
Hi Clement, I tried for 3 days in a row unsuccessfully, then I decided to switch MQTT Client.

We are using HiveMQ in production and is rock solid, so I converted your MQTT connector from VertX to HiveMQ and now it reconnects very well, I spent a lot of time converting Flowable and Single to Uni and Multi, but everything now is written using the standards that I saw inside the original module.

I also used the original properties for the configuration and I would like to ask you if someone can be interested in this new connector and create a PR or if automatic reconnection is not so important.

Let me know what do you think, if you've time this is the source code repo to clone:

g...@github.com:masini/smallrye-reactive-messaging.git

We will use this in production in the migration progress of a Spring Boot application to Quarkus.

Have a nice day.

clement escoffier

unread,
Feb 17, 2021, 8:00:56 AM2/17/21
to Luca Masini, Quarkus Development mailing list
Hello,

I had a quick look to your connector yesterday. Once a bit more polished (I've made a few comments), we should have it in SmallRye Reactive Messaging (as a new connector, like smallrye-mqtt-hivemq).

I also look at the existing connector, and wow... yeah... well. The underlying client does not make things easy (I knew it, but I didn't expect it to be so hard). One idea would be to implement a heartbeat using `ping` and pingResponseHandler (if you don't get a response in a timely fashion, trigger a reconnection). That's just an idea so far, I didn't start implementing it.

Clement

Luca Masini

unread,
Feb 17, 2021, 8:05:28 AM2/17/21
to clement escoffier, Quarkus Development mailing list
Hi Clement, I really thank you for the time you spent looking at my code.

I read your comments and I think I applied your suggestion in a good way.

When you have again 5' to look at the code I'll do everything that is needed to have a good working MQTT client.

I really think that reconnection with the current Vertx MQTT Client is very very hard, after a week I switched to a new connector and in a few hours it works :)

PS: I would like to add something also to the Health Check, any suggestion is welcome.

Luca Masini

unread,
Feb 17, 2021, 9:08:25 AM2/17/21
to clement escoffier, Quarkus Development mailing list
Regarding Health Liveness and Readiness I'm trying to use a mandatory topic:


which is "$SYS/broker/uptime", with a configuration of time we can understand if the broker is connected.

I don't think we need a publisher health-check, what do you think?

Julien Viet

unread,
Feb 17, 2021, 10:19:21 AM2/17/21
to luca....@gmail.com, clement escoffier, Quarkus Development mailing list
Hi Luca,

what can we do to improve Vert.x MQTT Client ?

Julien

Luca Masini

unread,
Feb 17, 2021, 10:27:58 AM2/17/21
to Julien Viet, clement escoffier, Quarkus Development mailing list
Hi Julien!

What we've experienced in the edge environment is that the broker can be down or restarted and current Vert'x MqttClient is unable to reconnect.

I was also unable to understand how to manage that manually in a Reactive Pipeline, so I did a new connector using HiveMQ that has automatic reconnection out of the box.

Do you think the client can be improved ?

There is an old issue:


and it seems that as far as now no one was really interested in this.


Julien Viet

unread,
Feb 17, 2021, 2:11:39 PM2/17/21
to Luca Masini, clement escoffier, Quarkus Development mailing list
I think it can be improved, according to the comment:

"It's clear that it's not an issue but just a missing feature, I'm going to close this one."

I think the issue should have been promoted as a feature.

Do you know how to create a test that produce this in a reliable way ?

Luca Masini

unread,
Feb 17, 2021, 11:51:14 PM2/17/21
to Julien Viet, clement escoffier, Quarkus Development mailing list
Hi Julien, I reproduced on my workstation using:

brew services restart mosquitto

This was the initial test-case and I also tried stopping for a longer time before restarting.

Soon I'll create a standalone "test" to include it in my JUnit's suite, if this can help you introducing this feature I can share it.

Thank you for your interest.

Luca Masini

unread,
Feb 18, 2021, 3:58:14 AM2/18/21
to Julien Viet, clement escoffier, Quarkus Development mailing list
Hi @Clement, I committed a new version that implements Health Checks for MQTT.

It uses the mandatory topic "$SYS/broker/uptime" to check broker liveness and readiness.

There are two new properties, one for readiness timeout and another for liveness, the default readiness value is double the default Mosquitto "sys_interval" value of 10s.

Julien Viet

unread,
Feb 18, 2021, 4:19:31 AM2/18/21
to Luca Masini, clement escoffier, Quarkus Development mailing list
thanks,


So I think we could easily use it too.

Julien

Julien Viet

unread,
Feb 19, 2021, 6:06:00 AM2/19/21
to Luca Masini, clement escoffier, Quarkus Development mailing list
so to clarify : let's say the client is used, the server disconnects and the client reconnects in the background.

what are the expectations from the client perspective ? 

Luca Masini

unread,
Feb 19, 2021, 6:20:20 AM2/19/21
to Julien Viet, clement escoffier, Quarkus Development mailing list
I expect this "workflow" to be transparent, so that a Reactive Subscriber can continue to work as soon as the server is available again and the client reconnects.



Reply all
Reply to author
Forward
0 new messages