I have few managers that react somehow to client requests. Each manager has it's own business logic. I'm parsing each request and according to it's parameters send to particular manager. I want to use some "forwarded" in the middle of these requests and managers, so for this goal I decided to use Majordomo pattern. But after writing test application to see how this pattern works, I saw that 1 client and 1 worker load my CPU to approximately 5%. In case of 3 clients and workers my CPU load is around 18-20% and that is very high (From each client I send 20 requests per second).
I also tried to do this without Majordomo pattern. by using simple REQ-REP pattern from each clients/worker. And in this case CPU load with 3 workers and 3 clients is around 2%, and this value is absolute maximum I've seen during test.
So my question is: is this CPU load is cause of Majordomo pattern implementation, my code or Majordomo pattern itself?
My test app code:
#include <thread>
#include "mdp_broker.c"
#include "mdp_worker.c"
#include "mdp_client.c"
void broker()
{
broker_t *self = s_broker_new(false);
s_broker_bind(self, "tcp://*:5555");
while (true) {
zmq_pollitem_t items[] = {{ self->socket, 0, ZMQ_POLLIN, 0 }};
int rc = zmq_poll(items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
if (rc == -1)
break;
if (items[0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv(self->socket);
if (!msg)
break;
zframe_t *sender = zmsg_pop(msg);
zframe_t *empty = zmsg_pop(msg);
zframe_t *header = zmsg_pop(msg);
if (zframe_streq(header, MDPC_CLIENT))
s_broker_client_msg(self, sender, msg);
else if (zframe_streq(header, MDPW_WORKER))
s_broker_worker_msg(self, sender, msg);
else {
zmsg_destroy(&msg);
}
zframe_destroy(&sender);
zframe_destroy(&empty);
zframe_destroy(&header);
}
if (zclock_time() > self->heartbeat_at) {
s_broker_purge(self);
worker_t *worker = (worker_t *)zlist_first(self->waiting);
while (worker) {
s_worker_send(worker, MDPW_HEARTBEAT, NULL, NULL);
worker = (worker_t *)zlist_next(self->waiting);
}
self->heartbeat_at = zclock_time() + HEARTBEAT_INTERVAL;
}
}
s_broker_destroy(&self);
}
void worker()
{
mdp_worker_t *session = mdp_worker_new(NULL, "tcp://localhost:5555", "echo", false);
while (1) {
zframe_t *reply_to;
zmsg_t *request = mdp_worker_recv(session, &reply_to);
if (request == NULL)
break;
mdp_worker_send(session, &request, reply_to);
zframe_destroy(&reply_to);
}
mdp_worker_destroy(&session);
}
void client()
{
mdp_client_t *session = mdp_client_new("tcp://localhost:5555", false);
for (;;) {
zmsg_t *request = zmsg_new();
zmsg_pushstr(request, "Hello world");
mdp_client_send(session, "echo", &request);
zmsg_t *reply = mdp_client_recv(session, NULL, NULL);
if (reply) {
zmsg_destroy(&reply);
}
else
break;
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
mdp_client_destroy(&session);
}
int main()
{
std::thread brokerThread(broker);
std::thread workerThread(worker);
std::thread clientThread(client);
std::this_thread::sleep_for(std::chrono::hours(24));
return 0;
}