Groups keyboard shortcuts have been updated
Dismiss
See shortcuts

[Urgent] Not able to reconnect to MQTT broker when multiple readergroups are used in same connection

143 views
Skip to first unread message

Chirantan Joshi

unread,
Aug 2, 2024, 6:42:22 AM8/2/24
to open62541
Hi Team,
I am using Open62541 v1.4.2 for Pubsub connections.
I am connecting to the broker with MQTT protocol using multiple topics in same connection, i.e. using multiple readergroups in same connection.

The connection is working fine till broker is running as soon as the broker goes down, the subscriber connection with broker is terminating and the subscriber is not able to re-establish the connection with broker once broker is up.

This is not the case if I use only single readergroup.

2 connections are established
Screenshot 6.png

After the broker is stopped, the connection is closed and MQTT connections are also closed.
Screenshot 7.png

After this server is running but not creating connection with MQTT broker after broker is up.

I am using the tutorial_pubsub_mqtt_subscribe.c file for testing this connection. https://github.com/open62541/open62541/blob/v1.4.2/examples/pubsub/tutorial_pubsub_mqtt_subscribe.c
and calling addReaderGroup method twice to create 2 readergroups in connection.

Please can you explain the reason for this?

Waiting for your response...
Thanks,
Chirantan

Julius Pfrommer

unread,
Aug 2, 2024, 6:46:48 AM8/2/24
to open62541
Probably the closing of the MQTT connection forces the PubSubConnection into an ERROR state.
And the ReaderGroup gets paused.

You can register a callback to be notified about that.

You can have a periodic "retry" to establish the connection again.
But the connection will not retry automatically once it goes to the ERROR state.

Once the connection is back up, the attached ReaderGroup will be operational again.

Regards,
Julius

Chirantan Joshi

unread,
Aug 2, 2024, 7:39:49 AM8/2/24
to open62541
Hi Julius,
I just checked the parameters and found that MQTT connection is not storing the topic and hence when it is trying to re-establish the connection, the topic field in parameters is null. That's why it is closing the broker connection and going to error state.

When only 1 readergroup is used, the topic is stored and found at reconnection time, so it is able reconnect.

How can we store the topic parameter for multiple readergroups?

Thanks,
Chirantan

Chirantan Joshi

unread,
Aug 5, 2024, 1:17:57 PM8/5/24
to open62541
Hi 
Any updates on how we can use multiple subscriber topics in 1 PubSub MQTT connection?
Waiting for your reply
Thanks,
Chirantan

Chirantan Joshi

unread,
Aug 7, 2024, 4:27:55 AM8/7/24
to open62541
Hi Team 
any updates?

Julius Pfrommer

unread,
Aug 12, 2024, 4:16:36 AM8/12/24
to open62541
Hey Chirantan,

The MQTT implementation has "topic connections" that sit on an underlying "broker connection".
The broker connection is set up automatically when a topic connection is created and there is no matching broker connection already.

So you need to track the "broker connection" and the "topic connection" separately.
Please set the log level to DEBUG and show us the output.

To speed this up further I suggest you take the existing unit test for MQTT and modify it to show the error.
Then we are in a good position to solve the issue quickly.

Regards,
Julius

Chirantan Joshi

unread,
Aug 26, 2024, 1:27:52 PM8/26/24
to open62541
Hi Julius,
Thanks for your reply.
I tried to debug this problem but as I am not much aware for the flow how broker and topic connections are created, I am not able to exactly find the problem.
Still one thing I found is: Broker and all topic connections are closed and freed when broker goes down. Reestablishment of connection is tried but it doesn't get any topic in "params" parameter of callback function. So it is not able to establish any connection with broker and finally goes to error state.

I am using the tutorial_pubsub_mqtt_subscribe.c file for testing this connection. 

I have made some changes in above file. Almost the whole code is same just some changes related to subscriber topic are done.

Subscriber code:
/* This work is licensed under a Creative Commons CCZero 1.0 Universal License.
 * See http://creativecommons.org/publicdomain/zero/1.0/ for more information.
 *
 * Copyright (c) 2022 Fraunhofer IOSB (Author: Noel Graf)
 */

#include <open62541/plugin/log_stdout.h>
#include <open62541/plugin/securitypolicy_default.h>
#include <open62541/server.h>
#include <open62541/server_pubsub.h>

#include <signal.h>
#include <stdio.h>

#define CONNECTION_NAME "MQTT Subscriber Connection"
#define TRANSPORT_PROFILE_URI                                                            \
    "http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt-uadp"
#define MQTT_CLIENT_ID "TESTCLIENTPUBSUBMQTTSUBSCRIBE"
#define CONNECTIONOPTION_NAME "mqttClientId"
#define SUBSCRIBER_METADATAQUEUENAME "MetaDataTopic"
#define SUBSCRIBER_METADATAUPDATETIME 0
#define BROKER_ADDRESS_URL "opc.mqtt://127.0.0.1:1883"

// Uncomment the following line to enable MQTT login for the example
// #define EXAMPLE_USE_MQTT_LOGIN

#ifdef EXAMPLE_USE_MQTT_LOGIN
#define LOGIN_OPTION_COUNT 2
#define USERNAME_OPTION_NAME "mqttUsername"
#define PASSWORD_OPTION_NAME "mqttPassword"
#define MQTT_USERNAME "open62541user"
#define MQTT_PASSWORD "open62541"
#endif

// Uncomment the following line to enable MQTT via TLS for the example
//#define EXAMPLE_USE_MQTT_TLS

#ifdef EXAMPLE_USE_MQTT_TLS
#define TLS_OPTION_COUNT 2
#define USE_TLS_OPTION_NAME "mqttUseTLS"
#define MQTT_CA_FILE_PATH_OPTION_NAME "mqttCaFilePath"
#define CA_FILE_PATH "/path/to/server.cert"
#endif

#if defined(UA_ENABLE_PUBSUB_ENCRYPTION) /* && !defined(UA_ENABLE_JSON_ENCODING)*/
#define UA_AES128CTR_SIGNING_KEY_LENGTH 32
#define UA_AES128CTR_KEY_LENGTH 32
#define UA_AES128CTR_KEYNONCE_LENGTH 4

UA_Byte signingKey[UA_AES128CTR_SIGNING_KEY_LENGTH] = {0};
UA_Byte encryptingKey[UA_AES128CTR_KEY_LENGTH] = {0};
UA_Byte keyNonce[UA_AES128CTR_KEYNONCE_LENGTH] = {0};
#endif

//#ifdef UA_ENABLE_JSON_ENCODING
// static UA_Boolean useJson = true;
//#else
static UA_Boolean useJson = false;
//#endif

char *SUBSCRIBER_TOPIC = "prosysopc/uadp/data/urn:DESKTOP-NA2TQ61:OPCUA:SimulationServer/"
                         "WriterGroup1/VariableDataSetWriter";

UA_NodeId connectionIdent;
UA_NodeId subscribedDataSetIdent;
UA_NodeId readerGroupIdent;

UA_DataSetReaderConfig readerConfig;

static void
fillTestDataSetMetaData(UA_DataSetMetaDataType *pMetaData);

static UA_StatusCode
addPubSubConnection(UA_Server *server, char *addressUrl) {
    UA_StatusCode retval = UA_STATUSCODE_GOOD;
    /* Details about the connection configuration and handling are located
     * in the pubsub connection tutorial */
    UA_PubSubConnectionConfig connectionConfig;
    memset(&connectionConfig, 0, sizeof(connectionConfig));
    connectionConfig.name = UA_STRING(CONNECTION_NAME);
    connectionConfig.transportProfileUri = UA_STRING(TRANSPORT_PROFILE_URI);
    connectionConfig.enabled = UA_TRUE;

    /* configure address of the mqtt broker (local on default port) */
    UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL,
                                                      UA_STRING(addressUrl)};
    UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl,
                         &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
    /* Changed to static publisherId from random generation to identify
     * the publisher on Subscriber side */
    connectionConfig.publisherIdType = UA_PUBLISHERIDTYPE_UINT16;
    connectionConfig.publisherId.uint16 = 2234;

    /* configure options, set mqtt client id */
    /* #ifdef EXAMPLE_USE_MQTT_LOGIN */
    /*     + LOGIN_OPTION_COUNT */
    /* #endif */
    /* #ifdef EXAMPLE_USE_MQTT_TLS */
    /*     + TLS_OPTION_COUNT */
    /* #endif */

    UA_KeyValuePair connectionOptions[1];

    UA_String mqttClientId = UA_STRING(MQTT_CLIENT_ID);
    connectionOptions[0].key = UA_QUALIFIEDNAME(0, CONNECTIONOPTION_NAME);
    UA_Variant_setScalar(&connectionOptions[0].value, &mqttClientId,
                         &UA_TYPES[UA_TYPES_STRING]);

    /* #ifdef EXAMPLE_USE_MQTT_LOGIN */
    /*     connectionOptions[connectionOptionIndex].key = UA_QUALIFIEDNAME(0,
     * USERNAME_OPTION_NAME); */
    /*     UA_String mqttUsername = UA_STRING(MQTT_USERNAME); */
    /*     UA_Variant_setScalar(&connectionOptions[connectionOptionIndex++].value,
     * &mqttUsername, &UA_TYPES[UA_TYPES_STRING]); */

    /*     connectionOptions[connectionOptionIndex].key = UA_QUALIFIEDNAME(0,
     * PASSWORD_OPTION_NAME); */
    /*     UA_String mqttPassword = UA_STRING(MQTT_PASSWORD); */
    /*     UA_Variant_setScalar(&connectionOptions[connectionOptionIndex++].value,
     * &mqttPassword, &UA_TYPES[UA_TYPES_STRING]); */
    /* #endif */

    /* #ifdef EXAMPLE_USE_MQTT_TLS */
    /*     connectionOptions[connectionOptionIndex].key = UA_QUALIFIEDNAME(0,
     * USE_TLS_OPTION_NAME); */
    /*     UA_Boolean mqttUseTLS = true; */
    /*     UA_Variant_setScalar(&connectionOptions[connectionOptionIndex++].value,
     * &mqttUseTLS, &UA_TYPES[UA_TYPES_BOOLEAN]); */

    /*     connectionOptions[connectionOptionIndex].key = UA_QUALIFIEDNAME(0,
     * MQTT_CA_FILE_PATH_OPTION_NAME); */
    /*     UA_String mqttCaFile = UA_STRING(CA_FILE_PATH); */
    /*     UA_Variant_setScalar(&connectionOptions[connectionOptionIndex++].value,
     * &mqttCaFile, &UA_TYPES[UA_TYPES_STRING]); */
    /* #endif */

    connectionConfig.connectionProperties.map = connectionOptions;
    connectionConfig.connectionProperties.mapSize = 1;

    retval |= UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdent);

    return retval;
}

/**
 * **ReaderGroup**
 *
 * ReaderGroup is used to group a list of DataSetReaders. All ReaderGroups are
 * created within a PubSubConnection and automatically deleted if the connection
 * is removed. All network message related filters are only available in the
 * DataSetReader. */
/* Add ReaderGroup to the created connection */
static UA_StatusCode
addReaderGroup(UA_Server *server) {
    if(server == NULL) {
        return UA_STATUSCODE_BADINTERNALERROR;
    }

    UA_StatusCode retval = UA_STATUSCODE_GOOD;
    UA_ReaderGroupConfig readerGroupConfig;
    memset(&readerGroupConfig, 0, sizeof(UA_ReaderGroupConfig));
    readerGroupConfig.name = UA_STRING("ReaderGroup1");
    if(useJson)
        readerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_JSON;

    /* configure the mqtt publish topic */
    UA_BrokerWriterGroupTransportDataType brokerTransportSettings;
    memset(&brokerTransportSettings, 0, sizeof(UA_BrokerWriterGroupTransportDataType));
    /* Assign the Topic at which MQTT publish should happen */
    /*ToDo: Pass the topic as argument from the reader group */
    brokerTransportSettings.queueName = UA_STRING(SUBSCRIBER_TOPIC);
    brokerTransportSettings.resourceUri = UA_STRING_NULL;
    brokerTransportSettings.authenticationProfileUri = UA_STRING_NULL;

    /* Choose the QOS Level for MQTT */
    brokerTransportSettings.requestedDeliveryGuarantee =
        UA_BROKERTRANSPORTQUALITYOFSERVICE_BESTEFFORT;

    /* Encapsulate config in transportSettings */
    UA_ExtensionObject transportSettings;
    memset(&transportSettings, 0, sizeof(UA_ExtensionObject));
    transportSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
    transportSettings.content.decoded.type =
        &UA_TYPES[UA_TYPES_BROKERDATASETREADERTRANSPORTDATATYPE];
    transportSettings.content.decoded.data = &brokerTransportSettings;

    readerGroupConfig.transportSettings = transportSettings;

    //#if defined(UA_ENABLE_PUBSUB_ENCRYPTION) /*&& !defined(UA_ENABLE_JSON_ENCODING)*/
    //    /* Encryption settings */
    //    UA_ServerConfig *config = UA_Server_getConfig(server);
    //    readerGroupConfig.securityMode = UA_MESSAGESECURITYMODE_SIGNANDENCRYPT;
    //    readerGroupConfig.securityPolicy = &config->pubSubConfig.securityPolicies[0];
    //#endif

    retval |= UA_Server_addReaderGroup(server, connectionIdent, &readerGroupConfig,
                                       &readerGroupIdent);
    //#if defined(UA_ENABLE_PUBSUB_ENCRYPTION) /*&& !defined(UA_ENABLE_JSON_ENCODING)*/
    //    /* Add the encryption key informaton */
    //    UA_ByteString sk = {UA_AES128CTR_SIGNING_KEY_LENGTH, signingKey};
    //    UA_ByteString ek = {UA_AES128CTR_KEY_LENGTH, encryptingKey};
    //    UA_ByteString kn = {UA_AES128CTR_KEYNONCE_LENGTH, keyNonce};
    //
    //    // TODO security token not necessary for readergroup (extracted from
    //    security-header) retval |= UA_Server_setReaderGroupEncryptionKeys(server,
    //    readerGroupIdent, 1, sk, ek, kn);
    //#endif
    retval |= UA_Server_setReaderGroupOperational(server, readerGroupIdent);

    return retval;
}

/**
 * **DataSetReader**
 *
 * DataSetReader can receive NetworkMessages with the DataSetMessage
 * of interest sent by the Publisher. DataSetReader provides
 * the configuration necessary to receive and process DataSetMessages
 * on the Subscriber side. DataSetReader must be linked with a
 * SubscribedDataSet and be contained within a ReaderGroup. */
/* Add DataSetReader to the ReaderGroup */
static UA_StatusCode
addDataSetReader(UA_Server *server) {
    if(server == NULL) {
        return UA_STATUSCODE_BADINTERNALERROR;
    }

    UA_StatusCode retval = UA_STATUSCODE_GOOD;
    memset(&readerConfig, 0, sizeof(UA_DataSetReaderConfig));
    readerConfig.name = UA_STRING("DataSet Reader 1");
    /* Parameters to filter which DataSetMessage has to be processed
     * by the DataSetReader */
    /* The following parameters are used to show that the data published by
     * tutorial_pubsub_mqtt_publish.c is being subscribed and is being updated in
     * the information model */
    UA_String publisherIdentifier =
        UA_STRING("urn:DESKTOP-NA2TQ61:OPCUA:SimulationServer");
    readerConfig.publisherId.type = &UA_TYPES[UA_TYPES_STRING];
    readerConfig.publisherId.data = &publisherIdentifier;
    readerConfig.writerGroupId = 1;
    readerConfig.dataSetWriterId = 1;
#ifdef UA_ENABLE_PUBSUB_MONITORING
    readerConfig.messageReceiveTimeout = 10;
#endif
   
    /* Setting up Meta data configuration in DataSetReader */
    fillTestDataSetMetaData(&readerConfig.dataSetMetaData);

    retval |= UA_Server_addDataSetReader(server, readerGroupIdent, &readerConfig,
                                         &subscribedDataSetIdent);
    return retval;
}

static UA_StatusCode
addDataSetReader2(UA_Server *server) {
    if(server == NULL) {
        return UA_STATUSCODE_BADINTERNALERROR;
    }

    UA_StatusCode retval = UA_STATUSCODE_GOOD;
    memset(&readerConfig, 0, sizeof(UA_DataSetReaderConfig));
    readerConfig.name = UA_STRING("DataSet Reader 2");
    /* Parameters to filter which DataSetMessage has to be processed
     * by the DataSetReader */
    /* The following parameters are used to show that the data published by
     * tutorial_pubsub_mqtt_publish.c is being subscribed and is being updated in
     * the information model */
    UA_String publisherIdentifier =
        UA_STRING("urn:DESKTOP-NA2TQ61:OPCUA:SimulationServer");
    readerConfig.publisherId.type = &UA_TYPES[UA_TYPES_STRING];
    readerConfig.publisherId.data = &publisherIdentifier;
    readerConfig.writerGroupId = 1;
    readerConfig.dataSetWriterId = 2;
#ifdef UA_ENABLE_PUBSUB_MONITORING
    readerConfig.messageReceiveTimeout = 10;
#endif

    /* Setting up Meta data configuration in DataSetReader */
    fillTestDataSetMetaData(&readerConfig.dataSetMetaData);

    retval |= UA_Server_addDataSetReader(server, readerGroupIdent, &readerConfig,
                                         &subscribedDataSetIdent);
    return retval;
}

/**
 * **SubscribedDataSet**
 *
 * Set SubscribedDataSet type to TargetVariables data type.
 * Add subscribedvariables to the DataSetReader */
static UA_StatusCode
addSubscribedVariables(UA_Server *server, UA_NodeId dataSetReaderId) {
    if(server == NULL)
        return UA_STATUSCODE_BADINTERNALERROR;

    UA_StatusCode retval = UA_STATUSCODE_GOOD;
    UA_NodeId folderId;
    UA_String folderName = readerConfig.dataSetMetaData.name;
    UA_ObjectAttributes oAttr = UA_ObjectAttributes_default;
    UA_QualifiedName folderBrowseName;
    if(folderName.length > 0) {
        oAttr.displayName.locale = UA_STRING("en-US");
        oAttr.displayName.text = folderName;
        folderBrowseName.namespaceIndex = 1;
        folderBrowseName.name = folderName;
    } else {
        oAttr.displayName = UA_LOCALIZEDTEXT("en-US", "Subscribed Variables");
        folderBrowseName = UA_QUALIFIEDNAME(1, "Subscribed Variables");
    }

    UA_Server_addObjectNode(
        server, UA_NODEID_NULL, UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
        UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES), folderBrowseName,
        UA_NODEID_NUMERIC(0, UA_NS0ID_BASEOBJECTTYPE), oAttr, NULL, &folderId);

    /**
     * **TargetVariables**
     *
     * The SubscribedDataSet option TargetVariables defines a list of Variable mappings
     * between received DataSet fields and target Variables in the Subscriber
     * AddressSpace. The values subscribed from the Publisher are updated in the value
     * field of these variables */
    /* Create the TargetVariables with respect to DataSetMetaData fields */
    UA_FieldTargetVariable *targetVars = (UA_FieldTargetVariable *)UA_calloc(
        readerConfig.dataSetMetaData.fieldsSize, sizeof(UA_FieldTargetVariable));
    for(size_t i = 0; i < readerConfig.dataSetMetaData.fieldsSize; i++) {
        /* Variable to subscribe data */
        UA_VariableAttributes vAttr = UA_VariableAttributes_default;
        UA_LocalizedText_copy(&readerConfig.dataSetMetaData.fields[i].description,
                              &vAttr.description);
        vAttr.displayName.locale = UA_STRING("en-US");
        vAttr.displayName.text = readerConfig.dataSetMetaData.fields[i].name;
        vAttr.dataType = readerConfig.dataSetMetaData.fields[i].dataType;

        UA_NodeId newNode;
        retval |= UA_Server_addVariableNode(
            server, UA_NODEID_NUMERIC(1, (UA_UInt32)i + 50000), folderId,
            UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT),
            UA_QUALIFIEDNAME(1, (char *)readerConfig.dataSetMetaData.fields[i].name.data),
            UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE), vAttr, NULL, &newNode);

        /* For creating Targetvariables */
        UA_FieldTargetDataType_init(&targetVars[i].targetVariable);
        targetVars[i].targetVariable.attributeId = UA_ATTRIBUTEID_VALUE;
        targetVars[i].targetVariable.targetNodeId = newNode;
    }

    retval = UA_Server_DataSetReader_createTargetVariables(
        server, dataSetReaderId, readerConfig.dataSetMetaData.fieldsSize, targetVars);
    for(size_t i = 0; i < readerConfig.dataSetMetaData.fieldsSize; i++)
        UA_FieldTargetDataType_clear(&targetVars[i].targetVariable);

    UA_free(targetVars);
    UA_free(readerConfig.dataSetMetaData.fields);
    return retval;
}

/**
 * **DataSetMetaData**
 *
 * The DataSetMetaData describes the content of a DataSet. It provides the information
 * necessary to decode DataSetMessages on the Subscriber side. DataSetMessages received
 * from the Publisher are decoded into DataSet and each field is updated in the Subscriber
 * based on datatype match of TargetVariable fields of Subscriber and
 * PublishedDataSetFields of Publisher */
/* Define MetaData for TargetVariables */
static void
fillTestDataSetMetaData(UA_DataSetMetaDataType *pMetaData) {
    if(pMetaData == NULL) {
        return;
    }

    UA_DataSetMetaDataType_init(pMetaData);
    pMetaData->name = UA_STRING("DataSet 1");

    /* Static definition of number of fields size to 4 to create four different
     * targetVariables of distinct datatype
     * Currently the publisher sends only DateTime data type */
    pMetaData->fieldsSize = 6;
    pMetaData->fields = (UA_FieldMetaData *)UA_Array_new(
        pMetaData->fieldsSize, &UA_TYPES[UA_TYPES_FIELDMETADATA]);

    /* DateTime DataType */
    UA_FieldMetaData_init(&pMetaData->fields[0]);
    UA_NodeId_copy(&UA_TYPES[UA_TYPES_INT32].typeId, &pMetaData->fields[0].dataType);
    pMetaData->fields[0].builtInType = UA_NS0ID_INT32;
    pMetaData->fields[0].name = UA_STRING("Int32");
    pMetaData->fields[0].valueRank = -1; /* scalar */

    /* Int32 DataType */
    UA_FieldMetaData_init(&pMetaData->fields[1]);
    UA_NodeId_copy(&UA_TYPES[UA_TYPES_DOUBLE].typeId, &pMetaData->fields[1].dataType);
    pMetaData->fields[1].builtInType = UA_NS0ID_DOUBLE;
    pMetaData->fields[1].name = UA_STRING("Int32");
    pMetaData->fields[1].valueRank = -1; /* scalar */

    /* Int64 DataType */
    UA_FieldMetaData_init(&pMetaData->fields[2]);
    UA_NodeId_copy(&UA_TYPES[UA_TYPES_DOUBLE].typeId, &pMetaData->fields[2].dataType);
    pMetaData->fields[2].builtInType = UA_NS0ID_DOUBLE;
    pMetaData->fields[2].name = UA_STRING("Int64");
    pMetaData->fields[2].valueRank = -1; /* scalar */

    /* Boolean DataType */
    UA_FieldMetaData_init(&pMetaData->fields[3]);
    UA_NodeId_copy(&UA_TYPES[UA_TYPES_DOUBLE].typeId, &pMetaData->fields[3].dataType);
    pMetaData->fields[3].builtInType = UA_NS0ID_DOUBLE;
    pMetaData->fields[3].name = UA_STRING("BoolToggle");
    pMetaData->fields[3].valueRank = -1; /* scalar */

    /* Boolean DataType */
    UA_FieldMetaData_init(&pMetaData->fields[4]);
    UA_NodeId_copy(&UA_TYPES[UA_TYPES_DOUBLE].typeId, &pMetaData->fields[4].dataType);
    pMetaData->fields[4].builtInType = UA_NS0ID_DOUBLE;
    pMetaData->fields[4].name = UA_STRING("BoolToggle");
    pMetaData->fields[4].valueRank = -1; /* scalar */

    /* Boolean DataType */
    UA_FieldMetaData_init(&pMetaData->fields[5]);
    UA_NodeId_copy(&UA_TYPES[UA_TYPES_DOUBLE].typeId, &pMetaData->fields[5].dataType);
    pMetaData->fields[5].builtInType = UA_NS0ID_DOUBLE;
    pMetaData->fields[5].name = UA_STRING("BoolToggle");
    pMetaData->fields[5].valueRank = -1; /* scalar */
}

UA_Server *server = NULL;
/**
 * Followed by the main server code, making use of the above definitions */
UA_Boolean running = true;
static void
stopHandler(int sign) {
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "received ctrl-c");
    running = false;
    UA_Server_delete(server);
    _Exit(0);
}
int
main(int argc, char **argv) {
    char *addressUrl = BROKER_ADDRESS_URL;

    signal(SIGINT, stopHandler);
    signal(SIGTERM, stopHandler);
    /* Return value initialized to Status Good */
    server = UA_Server_new();

#if defined(UA_ENABLE_PUBSUB_ENCRYPTION)
    /* Instantiate the PubSub SecurityPolicy */
    UA_ServerConfig *config = UA_Server_getConfig(server);
    config->pubSubConfig.securityPolicies =
        (UA_PubSubSecurityPolicy *)UA_malloc(sizeof(UA_PubSubSecurityPolicy));
    config->pubSubConfig.securityPoliciesSize = 1;
    UA_PubSubSecurityPolicy_Aes256Ctr(config->pubSubConfig.securityPolicies,
                                      config->logging);
#endif

    /* API calls */
    /* Add PubSubConnection */
    UA_StatusCode retval = addPubSubConnection(server, addressUrl);
    if(retval != UA_STATUSCODE_GOOD)
        goto cleanup;
    /* Add ReaderGroup to the created PubSubConnection */
    retval |= addReaderGroup(server);
    if(retval != UA_STATUSCODE_GOOD)
        goto cleanup;

    /* Add DataSetReader to the created ReaderGroup */
    retval |= addDataSetReader(server);
    if(retval != UA_STATUSCODE_GOOD)
        goto cleanup;
   
    /* Add SubscribedVariables to the created DataSetReader */
    retval |= addSubscribedVariables(server, subscribedDataSetIdent);
    if(retval != UA_STATUSCODE_GOOD)
        goto cleanup;

    SUBSCRIBER_TOPIC = "prosysopc/uadp/data/urn:DESKTOP-NA2TQ61:OPCUA:SimulationServer/"
                       "WriterGroup1/DataSetWriter2";

    retval |= addReaderGroup(server);
    if(retval != UA_STATUSCODE_GOOD)
        goto cleanup;

    retval |= addDataSetReader2(server);
    if(retval != UA_STATUSCODE_GOOD)
        goto cleanup;

    /* Add SubscribedVariables to the created DataSetReader */
    retval |= addSubscribedVariables(server, subscribedDataSetIdent);
    if(retval != UA_STATUSCODE_GOOD)
        goto cleanup;

    // retval = UA_Server_runUntilInterrupt(server);
    retval = UA_Server_run(server, &running);
cleanup:
    UA_Server_delete(server);
    return EXIT_SUCCESS;
}


Logs Received:
[2024-08-13 17:15:17.275 (UTC+0550)] info/eventloop Starting the EventLoop
[2024-08-13 17:15:17.275 (UTC+0550)] info/eventloop Interrupt | Starting the InterruptManager
[2024-08-13 17:15:17.275 (UTC+0550)] info/eventloop Interrupt | Socket pair for the self-pipe: 336,196
[2024-08-13 17:15:17.275 (UTC+0550)] info/eventloop Registering fd: 336
[2024-08-13 17:15:17.275 (UTC+0550)] warn/server AccessControl: Unconfigured AccessControl. Users have all permissions.
[2024-08-13 17:15:17.275 (UTC+0550)] info/server AccessControl: Anonymous login is enabled
[2024-08-13 17:15:17.275 (UTC+0550)] info/server x509 Certificate Authentication configured, but no encrypting SecurityPolicy. This can leak credentials on the network.
[2024-08-13 17:15:17.275 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode (i=15303): No TypeDefinition. Use the default TypeDefinition for the Variable/Object
[2024-08-13 17:15:17.275 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode (i=25451): No TypeDefinition. Use the default TypeDefinition for the Variable/Object
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode (ns=1;i=50809): The value is empty. But this is only allowed for BaseDataType. Create a matching default value.
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode (ns=1;i=50810): The value is empty. But this is only allowed for BaseDataType. Create a matching default value.
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode (ns=1;i=50811): The value is empty. But this is only allowed for BaseDataType. Create a matching default value.
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode (ns=1;i=50813): The value is empty. But this is only allowed for BaseDataType. Create a matching default value.
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode (ns=1;i=50816): The value is empty. But this is only allowed for BaseDataType. Create a matching default value.
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode (ns=1;i=50818): The value is empty. But this is only allowed for BaseDataType. Create a matching default value.
[2024-08-13 17:15:17.292 (UTC+0550)] info/network TCP | Open a connection to "127.0.0.1" on port 1883
[2024-08-13 17:15:17.292 (UTC+0550)] info/eventloop Registering fd: 352
[2024-08-13 17:15:17.292 (UTC+0550)] info/network TCP 352 | Opening a connection to "127.0.0.1" on port 1883
[2024-08-13 17:15:17.292 (UTC+0550)] info/network MQTT-TCP 352 | Network callback
[2024-08-13 17:15:17.292 (UTC+0550)] info/network MQTT-TCP 352 | Created broker connection
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode (ns=1;i=50823): The value is empty. But this is only allowed for BaseDataType. Create a matching default value.
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode (ns=1;i=50824): The value is empty. But this is only allowed for BaseDataType. Create a matching default value.
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode (ns=1;i=50825): The value is empty. But this is only allowed for BaseDataType. Create a matching default value.
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode (ns=1;i=50826): The value is empty. But this is only allowed for BaseDataType. Create a matching default value.
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode (ns=1;i=50827): The value is empty. But this is only allowed for BaseDataType. Create a matching default value.
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode (ns=1;i=50829): The value is empty. But this is only allowed for BaseDataType. Create a matching default value.
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode (ns=1;i=50000): The value is empty. But this is only allowed for BaseDataType. Create a matching default value.
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode (ns=1;i=50001): The value is empty. But this is only allowed for BaseDataType. Create a matching default value.
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode (ns=1;i=50002): The value is empty. But this is only allowed for BaseDataType. Create a matching default value.
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode (ns=1;i=50003): The value is empty. But this is only allowed for BaseDataType. Create a matching default value.
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode (ns=1;i=50004): The value is empty. But this is only allowed for BaseDataType. Create a matching default value.
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode (ns=1;i=50005): The value is empty. But this is only allowed for BaseDataType. Create a matching default value.
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode (ns=1;i=50838): The value is empty. But this is only allowed for BaseDataType. Create a matching default value.
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode (ns=1;i=50843): The value is empty. But this is only allowed for BaseDataType. Create a matching default value.
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode (ns=1;i=50844): The value is empty. But this is only allowed for BaseDataType. Create a matching default value.
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode (ns=1;i=50845): The value is empty. But this is only allowed for BaseDataType. Create a matching default value.
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode (ns=1;i=50846): The value is empty. But this is only allowed for BaseDataType. Create a matching default value.
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode (ns=1;i=50847): The value is empty. But this is only allowed for BaseDataType. Create a matching default value.
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode (ns=1;i=50849): The value is empty. But this is only allowed for BaseDataType. Create a matching default value.
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode: Node could not add the new node to the nodestore with error code
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode: Node could not add the new node to the nodestore with error code
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode: Node could not add the new node to the nodestore with error code
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode: Node could not add the new node to the nodestore with error code
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode: Node could not add the new node to the nodestore with error code
[2024-08-13 17:15:17.292 (UTC+0550)] info/session TCP 0 | SC 0 | Session "Administrator" | AddNode: Node could not add the new node to the nodestore with error code
[2024-08-13 17:15:17.292 (UTC+0550)] warn/server Maximum SecureChannels count not enough for the maximum Sessions count
[2024-08-13 17:15:17.292 (UTC+0550)] info/network TCP | Listening on all interfaces
[2024-08-13 17:15:17.292 (UTC+0550)] info/network TCP 452 | Creating listen socket for "DESKTOP-NA2TQ61" (with local hostname "DESKTOP-NA2TQ61") on port 4840
[2024-08-13 17:15:17.292 (UTC+0550)] info/eventloop Registering fd: 452
[2024-08-13 17:15:17.292 (UTC+0550)] info/server New DiscoveryUrl added: opc.tcp://DESKTOP-NA2TQ61:4840
[2024-08-13 17:15:17.292 (UTC+0550)] info/network TCP 612 | Creating listen socket for "DESKTOP-NA2TQ61" (with local hostname "DESKTOP-NA2TQ61") on port 4840
[2024-08-13 17:15:17.292 (UTC+0550)] info/eventloop Registering fd: 612
[2024-08-13 17:15:17.292 (UTC+0550)] info/eventloop Processing event 2 on fd 352
[2024-08-13 17:15:17.292 (UTC+0550)] info/network TCP 352 | Activity on the socket
[2024-08-13 17:15:17.292 (UTC+0550)] info/network TCP 352 | Opening a new connection
[2024-08-13 17:15:17.292 (UTC+0550)] info/network MQTT-TCP 352 | Network callback
[2024-08-13 17:15:17.292 (UTC+0550)] info/network TCP 352 | Attempting to send
[2024-08-13 17:15:17.292 (UTC+0550)] info/network TCP 352 | Attempting to send
[2024-08-13 17:15:17.292 (UTC+0550)] info/network MQTT 352002 | Created connection subscribed on topic "prosysopc/uadp/data/urn:DESKTOP-NA2TQ61:OPCUA:SimulationServer/WriterGroup1/DataSetWriter2"
[2024-08-13 17:15:17.292 (UTC+0550)] info/network TCP 352 | Attempting to send
[2024-08-13 17:15:17.292 (UTC+0550)] info/network MQTT 352001 | Created connection subscribed on topic "prosysopc/uadp/data/urn:DESKTOP-NA2TQ61:OPCUA:SimulationServer/WriterGroup1/VariableDataSetWriter"
[2024-08-13 17:15:17.292 (UTC+0550)] info/eventloop Processing event 1 on fd 352
[2024-08-13 17:15:17.292 (UTC+0550)] info/network TCP 352 | Activity on the socket
[2024-08-13 17:15:17.292 (UTC+0550)] info/network TCP 352 | Allocate receive buffer
[2024-08-13 17:15:17.292 (UTC+0550)] info/network TCP 352 | Received message of size 9
[2024-08-13 17:15:17.292 (UTC+0550)] info/network MQTT-TCP 352 | Network callback
[2024-08-13 17:15:17.292 (UTC+0550)] info/eventloop Processing event 1 on fd 352
[2024-08-13 17:15:17.292 (UTC+0550)] info/network TCP 352 | Activity on the socket
[2024-08-13 17:15:17.292 (UTC+0550)] info/network TCP 352 | Allocate receive buffer
[2024-08-13 17:15:17.292 (UTC+0550)] info/network TCP 352 | Received message of size 5
[2024-08-13 17:15:17.292 (UTC+0550)] info/network MQTT-TCP 352 | Network callback
[2024-08-13 17:15:17.510 (UTC+0550)] info/eventloop Processing event 1 on fd 352
[2024-08-13 17:15:17.510 (UTC+0550)] info/network TCP 352 | Activity on the socket
[2024-08-13 17:15:17.510 (UTC+0550)] info/network TCP 352 | Allocate receive buffer
[2024-08-13 17:15:17.510 (UTC+0550)] info/network TCP 352 | Received message of size 214
[2024-08-13 17:15:17.510 (UTC+0550)] info/network MQTT-TCP 352 | Network callback
[2024-08-13 17:15:17.510 (UTC+0550)] info/network MQTT 352002 | Received a message of 119 bytes
[2024-08-13 17:15:17.510 (UTC+0550)] info/pubsub ReaderGroup ns=1;i=50837 | Reader ns=1;i=50839 | Process Msg with DataSetReader!
[2024-08-13 17:15:17.510 (UTC+0550)] info/pubsub ReaderGroup ns=1;i=50837 | Reader ns=1;i=50839 | Received a network message
[2024-08-13 17:15:17.510 (UTC+0550)] warn/network Messages Received!!
[2024-08-13 17:15:17.510 (UTC+0550)] info/eventloop Processing event 1 on fd 352
[2024-08-13 17:15:17.510 (UTC+0550)] info/network TCP 352 | Activity on the socket
[2024-08-13 17:15:17.510 (UTC+0550)] info/network TCP 352 | Allocate receive buffer
[2024-08-13 17:15:17.510 (UTC+0550)] info/network TCP 352 | Received message of size 221
[2024-08-13 17:15:17.510 (UTC+0550)] info/network MQTT-TCP 352 | Network callback
[2024-08-13 17:15:17.510 (UTC+0550)] info/network MQTT 352001 | Received a message of 119 bytes
[2024-08-13 17:15:17.510 (UTC+0550)] info/pubsub ReaderGroup ns=1;i=50837 | Reader ns=1;i=50839 | DataSetWriterId doesn't match
[2024-08-13 17:15:17.510 (UTC+0550)] info/pubsub ReaderGroup ns=1;i=50817 | Reader ns=1;i=50819 | Process Msg with DataSetReader!
[2024-08-13 17:15:17.510 (UTC+0550)] info/pubsub ReaderGroup ns=1;i=50817 | Reader ns=1;i=50819 | Received a network message
[2024-08-13 17:15:18.516 (UTC+0550)] info/eventloop Processing event 1 on fd 352
[2024-08-13 17:15:18.516 (UTC+0550)] info/network TCP 352 | Activity on the socket
[2024-08-13 17:15:18.516 (UTC+0550)] info/network TCP 352 | Allocate receive buffer
[2024-08-13 17:15:18.516 (UTC+0550)] info/network TCP 352 | Received message of size 214
[2024-08-13 17:15:18.516 (UTC+0550)] info/network MQTT-TCP 352 | Network callback
[2024-08-13 17:15:18.516 (UTC+0550)] info/network MQTT 352002 | Received a message of 119 bytes
[2024-08-13 17:15:18.516 (UTC+0550)] info/pubsub ReaderGroup ns=1;i=50837 | Reader ns=1;i=50839 | Process Msg with DataSetReader!
[2024-08-13 17:15:18.516 (UTC+0550)] info/pubsub ReaderGroup ns=1;i=50837 | Reader ns=1;i=50839 | Received a network message
[2024-08-13 17:15:41.520 (UTC+0550)] info/eventloop Processing event 1 on fd 352
[2024-08-13 17:15:41.520 (UTC+0550)] info/network TCP 352 | Activity on the socket
[2024-08-13 17:15:41.520 (UTC+0550)] info/network TCP 352 | Allocate receive buffer
[2024-08-13 17:15:41.520 (UTC+0550)] info/network TCP 352 | Received message of size 221
[2024-08-13 17:15:41.520 (UTC+0550)] info/network MQTT-TCP 352 | Network callback
[2024-08-13 17:15:41.520 (UTC+0550)] info/network MQTT 352001 | Received a message of 119 bytes
[2024-08-13 17:15:41.520 (UTC+0550)] info/pubsub ReaderGroup ns=1;i=50837 | Reader ns=1;i=50839 | DataSetWriterId doesn't match
[2024-08-13 17:15:41.520 (UTC+0550)] info/pubsub ReaderGroup ns=1;i=50817 | Reader ns=1;i=50819 | Process Msg with DataSetReader!
[2024-08-13 17:15:41.520 (UTC+0550)] info/pubsub ReaderGroup ns=1;i=50817 | Reader ns=1;i=50819 | Received a network message
[2024-08-13 17:15:42.398 (UTC+0550)] info/eventloop Processing event 1 on fd 352
[2024-08-13 17:15:42.398 (UTC+0550)] info/network TCP 352 | Activity on the socket
[2024-08-13 17:15:42.398 (UTC+0550)] info/network TCP 352 | Allocate receive buffer
[2024-08-13 17:15:42.398 (UTC+0550)] info/network TCP 352 | recv signaled the socket was shutdown (The operation completed successfully.

)
[2024-08-13 17:15:42.398 (UTC+0550)] info/network TCP 352 | Shutdown triggered
[2024-08-13 17:15:42.398 (UTC+0550)] info/eventloop TCP 352 | Delayed closing of the connection
[2024-08-13 17:15:42.398 (UTC+0550)] info/eventloop Unregistering fd: 352
[2024-08-13 17:15:42.398 (UTC+0550)] info/network MQTT-TCP 352 | Network callback
[2024-08-13 17:15:42.398 (UTC+0550)] info/network MQTT-TCP 352 | Removing the broker connection
[2024-08-13 17:15:42.398 (UTC+0550)] info/network MQTT 352002 | Closing the connection
[2024-08-13 17:15:42.398 (UTC+0550)] info/network TCP | Open a connection to "127.0.0.1" on port 1883
[2024-08-13 17:15:42.400 (UTC+0550)] info/eventloop Registering fd: 616
[2024-08-13 17:15:42.400 (UTC+0550)] info/network TCP 616 | Opening a connection to "127.0.0.1" on port 1883
[2024-08-13 17:15:42.400 (UTC+0550)] info/network MQTT-TCP 616 | Network callback
[2024-08-13 17:15:42.400 (UTC+0550)] info/network MQTT-TCP 616 | Created broker connection
[2024-08-13 17:15:42.400 (UTC+0550)] info/network MQTT-TCP 616 | Closing the broker connection
[2024-08-13 17:15:42.400 (UTC+0550)] info/network TCP 616 | Shutdown triggered
[2024-08-13 17:15:42.400 (UTC+0550)] error/pubsub ReaderGroup ns=1;i=50837 | Could not open the MQTT connection
[2024-08-13 17:15:42.400 (UTC+0550)] error/pubsub ReaderGroup ns=1;i=50837 | Could not connect
[2024-08-13 17:15:42.400 (UTC+0550)] info/network MQTT 352001 | Closing the connection
[2024-08-13 17:15:42.400 (UTC+0550)] info/network TCP 352 | Socket closed
[2024-08-13 17:15:42.400 (UTC+0550)] info/eventloop TCP 616 | Delayed closing of the connection
[2024-08-13 17:15:42.400 (UTC+0550)] info/eventloop Unregistering fd: 616
[2024-08-13 17:15:42.400 (UTC+0550)] info/network MQTT-TCP 616 | Network callback
[2024-08-13 17:15:42.400 (UTC+0550)] info/network MQTT-TCP 616 | Removing the broker connection
[2024-08-13 17:15:42.400 (UTC+0550)] info/network TCP 616 | Socket closed
[2024-08-13 17:15:55.364 (UTC+0550)] info/server received ctrl-c
[2024-08-13 17:15:55.364 (UTC+0550)] error/server The server must be fully stopped before it can be deleted

Waiting for your reply..

Thanks,
Chirantan

Chirantan Joshi

unread,
Aug 26, 2024, 1:28:00 PM8/26/24
to open62541
Hi any update?

Chirantan Joshi

unread,
Sep 24, 2024, 4:01:26 PM9/24/24
to open62541
Hi Julius,
Any updates regarding this issue?

Julius Pfrommer

unread,
Sep 24, 2024, 4:03:48 PM9/24/24
to open62541
See my answer in the Github issue you opened.

This commit should haved fixed the problem: 1c9116d

It is on 1.4 and master already.

Chirantan Joshi

unread,
Dec 6, 2024, 6:52:13 AM12/6/24
to open62541
Hi @jpfr,
I tried implementing your changes also taken clone of latest release v1.4.6, but still issue is present.

I am connecting multiple topics through single PubSub Connection.
When I am closing the broker and restarting it again, the subscriber is not able to connect back.
anything else we can do here?

Thanks,
Chirantan
Reply all
Reply to author
Forward
0 new messages