Hi guys!
I am writing a tool to benchmark mosquitto from one computer to emulate 50K concurrent connection to a broker.
Follow below steps:
1. Init a common config.
2. create a bunch of threads, change id, username, pass then make new mosquitto instance to connect and hold connections to mqtt broker
I already set all ulimit but when reach about ~340 connections, the function mosquitto_loop_forever failed with error (Invalid function arguments provided).
I could archieved 50K threads without mqtt.
Pls help me with this problems.
Thanks and Regards
Here my code:
typedef struct {
struct mosq_config *cfg;
int client_idx;
} pthread_argv_t;
int main(int argc, char *argv[]) {
struct mosq_config cfg;
int rc, threadcnt = 0;
pthread_t thid;
pthread_argv_t pthdata;
pthread_attr_t thread_attr;
if ((rc = client_config_load(&cfg, argc, argv))) {
client_config_cleanup(&cfg);
return -1;
}
pthdata.cfg = &cfg;
for (int i = 0; i <= 342; i++) {
printf("--> Creating thread no %d ...\n", i);
pthdata.client_idx = i;
rc = pthread_create(&thid, &thread_attr, (void *)mqtt_publish_thread, (void *)&pthdata);
if (rc) {
// error printf
} else {
printf("--> Created thread no %d id %ld\n", i, thid);
threadcnt++;
printf("Number of created thread [\033[31m%d\033[0m]\n", threadcnt);
}
usleep(50000);
}
while (true) {
printf("Main thread running ...\n ");
sleep(10);
}
mosquitto_lib_cleanup();
return rc;
}
void mqtt_publish_thread(void *argv) {
char err[256];
int rc;
pthread_argv_t *pthdata = (pthread_argv_t *)argv;
struct mosquitto *mosq = NULL;
printf(" --> Inside thread %ld\n", pthread_self());
mosq = mosquitto_new(pthdata->cfg->id, true, pthdata->cfg);
if (!mosq) {
// error printf
goto exit;
}
mosquitto_connect_callback_set(mosq, my_connect_callback);
mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
pthdata->cfg->username = new_username();
pthdata->cfg->id = strdup(pthdata->cfg->username);
pthdata->cfg->password = new_pasS();
//printf(" --> Id <%s>@<%s>\n", pthdata->cfg->username, pthdata->cfg->password);
if (client_opts_set(mosq, pthdata->cfg)) goto exit;
if (client_connect(mosq, pthdata->cfg)) goto exit;
rc = mosquitto_loop_forever(mosq, pthdata->cfg->keepalive * 1000, 1);
if (rc == MOSQ_ERR_ERRNO) {
strerror_r(errno, err, 1024);
printf("\033[31mError\033[0m: %s\n", err);
} else {
printf("Unable to loop <%d>(%s).\n", rc, mosquitto_strerror(rc));
}
exit:
pthread_exit(NULL);
}