A mosquitto Mqtt benchmark

223 views
Skip to first unread message

Thành Khắc Nguyễn

unread,
Jun 15, 2017, 12:40:18 AM6/15/17
to MQTT
Hi guys!
I am writing a tool to benchmark mosquitto from one computer to emulate 50K concurrent connection to a broker.
Follow below steps:
1. Init a common config.
2. create a bunch of threads, change id, username, pass then make new mosquitto instance to connect and hold connections to mqtt broker
I already set all ulimit but when reach about ~340 connections, the function mosquitto_loop_forever failed with error (Invalid function arguments provided).
I could archieved 50K threads without mqtt.
Pls help me with this problems.
Thanks and Regards

Here my code:
typedef struct {
    struct mosq_config *cfg;
    int client_idx;
} pthread_argv_t;

int main(int argc, char *argv[]) {
    struct mosq_config cfg;
    int rc, threadcnt = 0;
    pthread_t thid;
    pthread_argv_t pthdata;
    pthread_attr_t thread_attr;
    if ((rc = client_config_load(&cfg, argc, argv))) {
        client_config_cleanup(&cfg);
        return -1;
    }
    pthdata.cfg = &cfg;

    for (int i = 0; i <= 342; i++) {
        printf("--> Creating thread no %d ...\n", i);
        pthdata.client_idx = i;
        rc = pthread_create(&thid, &thread_attr, (void *)mqtt_publish_thread, (void *)&pthdata);
        if (rc) {
            // error printf
        } else {
            printf("--> Created thread no %d id %ld\n", i, thid);
            threadcnt++;           
            printf("Number of created thread [\033[31m%d\033[0m]\n", threadcnt);
        }
        usleep(50000);
    }
    while (true) {
        printf("Main thread running ...\n ");
        sleep(10);
    }   
    mosquitto_lib_cleanup();
    return rc;
}   

void mqtt_publish_thread(void *argv) {
    char err[256];
    int rc;
   
    pthread_argv_t *pthdata = (pthread_argv_t *)argv;
    struct mosquitto *mosq = NULL;
    printf("    --> Inside thread %ld\n", pthread_self());   
    mosq = mosquitto_new(pthdata->cfg->id, true, pthdata->cfg);
    if (!mosq) {
        // error printf
        goto exit;
    }
    mosquitto_connect_callback_set(mosq, my_connect_callback);
    mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);

    pthdata->cfg->username = new_username();
    pthdata->cfg->id = strdup(pthdata->cfg->username);
    pthdata->cfg->password = new_pasS();
    //printf("    --> Id <%s>@<%s>\n", pthdata->cfg->username, pthdata->cfg->password);   
    if (client_opts_set(mosq, pthdata->cfg)) goto exit;   
    if (client_connect(mosq, pthdata->cfg)) goto exit;   
    rc = mosquitto_loop_forever(mosq, pthdata->cfg->keepalive * 1000, 1);
    if (rc == MOSQ_ERR_ERRNO) {
        strerror_r(errno, err, 1024);
        printf("\033[31mError\033[0m: %s\n", err);
    } else {
        printf("Unable to loop <%d>(%s).\n", rc, mosquitto_strerror(rc));
    }
exit:
    pthread_exit(NULL);
}
Reply all
Reply to author
Forward
0 new messages