Hi All,
I've recently started exploring grpc for server side telemetry. Both client and serve are running on same host machine.
Client is sending one subscribe request and then goes into loop waiting for streams. Server is implemented using async API
with two threads. Producer thread is producing the data to a Queue say 60000 elements in a tight loop then go for sleep for 5 sec.
Service thread is reading data from Q and invoking write() API. As I understood from documents/examples only one write is allowed before notification
is received/serviced via AsyncNext so effectively it becomes like (service completion Q, write, service completion Q, write ....)
With this I'm getting maximum 10K streams per second (Encoder is not really doing much).
So first question is this right way to us C++ binding. Second question is 10K/seconds is max ? (it seems too low and am sure missing something here)
ServiceThread ()
{
----
while (true) {
// First service the Completion Q
status = (cq_->AsyncNext(&tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME))); << don't wait
if (status == CompletionQueue::NextStatus::GOT_EVENT) {
switch (static_cast<Type>(reinterpret_cast<size_t>(tag))) {
default :
if (ok) {
static_cast<Stream*>(tag)->Proceed();
} else {
}
break;
}
if (++i == 10000) {
std::cout << " Processed msgs " << i << std::endl;
i = 0;
}
} else { // If nothing in completion Q then process the next element from thread Q which will invoke grpc write() API.
ServiceThreadQ(&max_wait);
}
}
}
ServiceThreadQ (struct timespec *max_wait) {
// Process just one element from Q. If Empty then do timed wait.
pthread_mutex_lock(&mq->mu_queue))
if(!mq->msg_queue.empty()) {
intf_counters *cntr = mq->msg_queue.front();
mq->msg_queue.pop();
pthread_mutex_unlock(&mq->mu_queue)
PushStream(cntr); << invokes write() API and no other work
free(cntr);
return;
} else {
int rv = pthread_cond_timedwait(&mq->cond, &mq->mu_queue, max_wait);
switch (rv) {
----
}
pthread_mutex_unlock(&mq->mu_queue)
return;
}
}
Thanks