High CPU load when using Majordomo pattern

19 views
Skip to first unread message

Bogdan Glinskiy

unread,
Nov 29, 2016, 2:01:39 PM11/29/16
to zeromq

I have few managers that react somehow to client requests. Each manager has it's own business logic. I'm parsing each request and according to it's parameters send to particular manager. I want to use some "forwarded" in the middle of these requests and managers, so for this goal I decided to use Majordomo pattern. But after writing test application to see how this pattern works, I saw that 1 client and 1 worker load my CPU to approximately 5%. In case of 3 clients and workers my CPU load is around 18-20% and that is very high (From each client I send 20 requests per second).

I also tried to do this without Majordomo pattern. by using simple REQ-REP pattern from each clients/worker. And in this case CPU load with 3 workers and 3 clients is around 2%, and this value is absolute maximum I've seen during test.

So my question is: is this CPU load is cause of Majordomo pattern implementation, my code or Majordomo pattern itself?

My test app code:


#include <thread>
#include "mdp_broker.c"
#include "mdp_worker.c"
#include "mdp_client.c"

void broker()
{
    broker_t *self = s_broker_new(false);
    s_broker_bind(self, "tcp://*:5555");

    while (true) {
        zmq_pollitem_t items[] = {{ self->socket, 0, ZMQ_POLLIN, 0 }};
        int rc = zmq_poll(items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
        if (rc == -1)
            break;

        if (items[0].revents & ZMQ_POLLIN) {
            zmsg_t *msg = zmsg_recv(self->socket);
            if (!msg)
                break;

            zframe_t *sender = zmsg_pop(msg);
            zframe_t *empty = zmsg_pop(msg);
            zframe_t *header = zmsg_pop(msg);

            if (zframe_streq(header, MDPC_CLIENT))
                s_broker_client_msg(self, sender, msg);
            else if (zframe_streq(header, MDPW_WORKER))
                s_broker_worker_msg(self, sender, msg);
            else {
                zmsg_destroy(&msg);
            }

            zframe_destroy(&sender);
            zframe_destroy(&empty);
            zframe_destroy(&header);
        }

        if (zclock_time() > self->heartbeat_at) {
            s_broker_purge(self);
            worker_t *worker = (worker_t *)zlist_first(self->waiting);
            while (worker) {
                s_worker_send(worker, MDPW_HEARTBEAT, NULL, NULL);
                worker = (worker_t *)zlist_next(self->waiting);
            }
            self->heartbeat_at = zclock_time() + HEARTBEAT_INTERVAL;
        }
    }

    s_broker_destroy(&self);
}

void worker()
{
    mdp_worker_t *session = mdp_worker_new(NULL, "tcp://localhost:5555", "echo", false);

    while (1) {
        zframe_t *reply_to;
        zmsg_t *request = mdp_worker_recv(session, &reply_to);
        if (request == NULL)
            break;
        mdp_worker_send(session, &request, reply_to);
        zframe_destroy(&reply_to);
    }

    mdp_worker_destroy(&session);
}

void client()
{
    mdp_client_t *session = mdp_client_new("tcp://localhost:5555", false);

    for (;;) {
        zmsg_t *request = zmsg_new();
        zmsg_pushstr(request, "Hello world");
        mdp_client_send(session, "echo", &request);
        zmsg_t *reply = mdp_client_recv(session, NULL, NULL);
        if (reply) {
            zmsg_destroy(&reply);
        }
        else
            break;
        std::this_thread::sleep_for(std::chrono::milliseconds(50));
    }

    mdp_client_destroy(&session);
}

int main()
{
    std::thread brokerThread(broker);
    std::thread workerThread(worker);
    std::thread clientThread(client);

    std::this_thread::sleep_for(std::chrono::hours(24));

    return 0;
}
Reply all
Reply to author
Forward
0 new messages