mqtt_connection_t* mqtt_connect(const char* id, const char *host, int port, const char *username, const char *password, mqtt_cb_t *cb)
{
mqtt_connection_t *conn = g_new0(mqtt_connection_t, 1);
conn->host = g_strdup(host);
conn->id = g_strdup(id);
conn->connected = false;
conn->callbacks = *cb;
conn->topics = NULL;
conn->port = port;
mosquitto_lib_init();
int major, minor, revision;
mosquitto_lib_version(&major, &minor, &revision);
assert_log_exit_with_msg(!(major < 1 || minor < 5 || revision < 4), "Incompatible libmosquitto version. Minimum version required is v1.5.4");
conn->mosq = mosquitto_new(id, true, conn);
if (conn->mosq == NULL)
{
log_err("Out of memory");
goto error;
}
// Only allow one message to be queued in the Mosquitto library
mosquitto_max_inflight_messages_set(conn->mosq, 1);
// Exponential backoff between 3 and 60 seconds
mosquitto_reconnect_delay_set(conn->mosq, 3, 60, true);
mosquitto_opts_set(conn->mosq, MOSQ_OPT_PROTOCOL_VERSION, (char*)"MQTT_PROTOCOL_V311");
if (mosquitto_tls_set(conn->mosq, NULL, "/etc/ssl/certs", NULL, NULL, NULL) != MOSQ_ERR_SUCCESS)
{
log_err("Failed to set MQTT client TLS options");
goto error;
}
if (mosquitto_tls_opts_set(conn->mosq, 1, "tlsv1.2", NULL) != MOSQ_ERR_SUCCESS)
{
log_err("Failed to set MQTT client TLS version");
goto error;
}
mosquitto_username_pw_set(conn->mosq, username, password);
mosquitto_log_callback_set(conn->mosq, log_callback);
mosquitto_message_callback_set(conn->mosq, message_callback);
mosquitto_publish_callback_set(conn->mosq, publish_callback);
mosquitto_connect_callback_set(conn->mosq, connect_callback); // <--------------- here I set the connect callback
mosquitto_disconnect_callback_set(conn->mosq, disconnect_callback);
log_info("Connecting to %s on port %d with ID %s", conn->host, conn->port, conn->id);
int status = mosquitto_connect(conn->mosq, conn->host, conn->port, MQTT_DEFAULT_KEEPALIVE); //<--------- THIS RETURNS MOSQ_ERR_SUCCESS but the connect callback is never called
if (status != MOSQ_ERR_SUCCESS)
{
log_err("Unable to connect to broker: %s", mosquitto_strerror(status));
return conn;
}
int fd = mosquitto_socket(conn->mosq);
if (fd < 0)
{
log_err("Unable to get socket handle");
return conn;
}
conn->source = g_unix_fd_add(fd, G_IO_IN, (GUnixFDSourceFunc)do_mosquitto_event, conn);
return conn;
error:
mqtt_disconnect(conn);
return NULL;
}