#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTAsync.h"
#define CLIENTID "ExampleClientSub"
#define TOPIC "MQTT Examples"
#define PAYLOAD "Hello World!"
#define QOS 1
#define TIMEOUT 10000L
#define TOPICS "resp"
volatile MQTTAsync_token deliveredtoken;
int disc_finished = 0;
int subscribed = 0;
int finished = 0;
int MQTTPacket_VBIlen(int);
int MQTTPacket_VBIlen(int rem_len)
{
int rc = 0;
if (rem_len < 128)
rc = 1;
else if (rem_len < 16384)
rc = 2;
else if (rem_len < 2097152)
rc = 3;
else
rc = 4;
return rc;
}
int MQTTProperties_len(MQTTProperties* props)
{
return (props == NULL) ? 1 : props->length + MQTTPacket_VBIlen(props->length);
}
void connlost(void *context, char *cause)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
printf("\nConnection lost\n");
printf(" cause: %s\n", cause);
printf("Reconnecting\n");
MQTTProperties prop = MQTTProperties_initializer;
MQTTProperty property;
MQTTLenString name;
prop.count = 1;
prop.max_count = 10;
property.identifier = MQTTPROPERTY_CODE_REQUEST_RESPONSE_INFORMATION;
property.value.byte = 1;
MQTTProperties_add(&prop, &property);
conn_opts.keepAliveInterval = 20;
conn_opts.connectProperties = ∝
//conn_opts.cleansession = 1;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
finished = 1;
}
}
int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
{
int i;
char* payloadptr;
int id;
MQTTProperties list;
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: ");
payloadptr = message->payload;
for(i=0; i<message->payloadlen; i++)
{
putchar(*payloadptr++);
}
putchar('\n');
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
void onDisconnect(void* context, MQTTAsync_successData5* response)
{
printf("Successful disconnection\n");
disc_finished = 1;
}
void onSubscribe(void* context, MQTTAsync_successData5* response)
{
printf("Subscribe succeeded\n");
subscribed = 1;
}
void onSubscribeFailure(void* context, MQTTAsync_failureData5* response)
{
printf("Subscribe failed, rc %d\n", response ? response->code : 0);
finished = 1;
}
void onConnectFailure(void* context, MQTTAsync_failureData5* response)
{
printf("Connect failed, rc %d\n", response ? response->code : 0);
finished = 1;
}
void onSend(void* context, MQTTAsync_successData5* response)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
int rc;
printf("Message with token value %d delivery confirmed\n", response->token);
opts.onSuccess5 = onDisconnect;
opts.context = client;
if ((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS)
{
printf("1");
printf("Failed to start sendMessage, return code %d\n", rc);
exit(-1);
}
}
void onConnect(void* context, MQTTAsync_successData5* response)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_responseOptions opts1 = MQTTAsync_responseOptions_initializer;
MQTTProperties prop = MQTTProperties_initializer;
MQTTProperty property;
MQTTLenString name;
prop.count = 1;
prop.max_count = 10;
property.identifier = MQTTPROPERTY_CODE_RESPONSE_TOPIC;
property.value.data.data = "resp";
property.value.data.len = strlen(property.value.data.data);
MQTTProperties_add(&prop, &property);
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
int rc;
printf("Successful connection\n");
printf("Waiting for publication of %s\n" "on topic %s for client with ClientID: %s\n",
PAYLOAD, TOPIC, CLIENTID);
opts1.properties = prop;
opts1.context = client;
opts1.onSuccess5 = onSend;
deliveredtoken = 0;
if ((rc = MQTTAsync_sendMessage(client, TOPIC, &pubmsg, &opts1)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(-1);
}
printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
"Press Q<Enter> to quit\n\n", TOPICS, CLIENTID, QOS);
opts.onSuccess5 = onSubscribe;
opts.onFailure5 = onSubscribeFailure;
opts.context = client;
// opts.properties = prop; not a subscriber property
if ((rc = MQTTAsync_subscribe(client, TOPICS, QOS, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start subscribe, return code %d\n", rc);
exit(EXIT_FAILURE);
}
MQTTProperties_free(&prop);
}
int main(int argc, char* argv[])
{
MQTTAsync client;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer5;
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_token token;
int rc;
int ch;
MQTTAsync_createOptions crea_opts = MQTTAsync_createOptions_initializer;
crea_opts.MQTTVersion = MQTTVERSION_5;
MQTTAsync_createWithOptions(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL, &crea_opts);
MQTTAsync_setCallbacks(client, NULL, connlost, msgarrvd, NULL);
MQTTProperties prop = MQTTProperties_initializer;
MQTTProperty property;
MQTTLenString name;
prop.count = 1;
prop.max_count = 10;
property.identifier = MQTTPROPERTY_CODE_REQUEST_RESPONSE_INFORMATION;
property.value.byte = 1;
MQTTProperties_add(&prop, &property);
conn_opts.keepAliveInterval = 20;
//conn_opts.cleansession = 1;
conn_opts.onSuccess5 = onConnect;
conn_opts.onFailure5 = onConnectFailure;
conn_opts.context = client;
conn_opts.connectProperties = ∝
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
while (!subscribed)
#if defined(WIN32) || defined(WIN64)
Sleep(100);
#else
usleep(10000L);
#endif
if (finished) //if failure occurs
goto exit;
do
{
ch = getchar();
} while (ch!='Q' && ch != 'q');
disc_opts.onSuccess5 = onDisconnect;
// disc_opts.properties = prop;
if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start disconnect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
while (!disc_finished)
#if defined(WIN32) || defined(WIN64)
Sleep(100);
#else
usleep(10000L);
#endif
exit:
MQTTAsync_destroy(&client);
return rc;
}