bidirectional streaming c++ - how to process further reads from client on same RPC

56 views
Skip to first unread message

Lalit Kumar

unread,
Apr 26, 2019, 12:56:04 PM4/26/19
to grpc.io
Hi,

In my c++ implementation Client opens a long-lived RPC.  Server is implemented in Async way. Once client is connected server starts sending data to client whenever data is available. Meanwhile client is sending more data on same RPC which server needs to read. My logic is once in while go and invoke Read() on same stream.

Is this the right approach ? I'm seeing new data being received as part of Read() however after some successful reads server crashes.


On server side code snippet is something like (write, process completion Q, Write, process Completion Q, ..., read(), write, process completion Q,...)


ServiceThread ()
{
 ----
  while (true) {
      // First service the Completion Q
      status = (cq_->AsyncNext(&tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME))); << don't wait
      if (status == CompletionQueue::NextStatus::GOT_EVENT) {
      switch (static_cast<Type>(reinterpret_cast<size_t>(tag))) {
        default :
          if (ok) {
            static_cast<Stream*>(tag)->Proceed();
          } else {
          }
          break;
      }
      if (++i == 10000) {
        std::cout << " Processed msgs " << i << std::endl;
        i = 0;
      }
    } else { // If nothing in completion Q then process the next element from thread Q which will invoke grpc write() API.
      if (i % 10 == 0) {
        strea.read(..); <--- once in while check if more data is avaible to read
      } else {
         ServiceThreadQ(&max_wait);
      }
    }
  }
}

ServiceThreadQ (struct timespec *max_wait) {
  // Process just one element from Q. If Empty then do timed wait.
    pthread_mutex_lock(&mq->mu_
queue))
    if(!mq->msg_queue.empty()) {
      intf_counters *cntr = mq->msg_queue.front();
      mq->msg_queue.pop();
      pthread_mutex_unlock(&mq->mu_queue)
      PushStream(cntr); << invokes write() API and no other work
      free(cntr);
      return;
    } else {
      int rv = pthread_cond_timedwait(&mq->cond, &mq->mu_queue, max_wait);
      switch (rv) {
        ----
      }
      pthread_mutex_unlock(&mq->mu_queue)
      return;
    }
}

Thanks
Reply all
Reply to author
Forward
0 new messages