Java gRPC - Most efficient way to deal with client onReady() and isReady()

1,819 views
Skip to first unread message

Matt Mitchell

unread,
Dec 31, 2017, 10:07:58 PM12/31/17
to grpc.io
Hi,

I'm using onReady() to initialize my client connection (a long lived bidi connection) and then using isReady() like:

public void onNext(MyMessage msg) {
  while( ! requestStream.isReady() ){ sleep-for-som-short-time }
  requestStream.request(1);
  requestStream.onNext(msg);
}

Which seems to work fine, but it also causes high CPU usage. Is there a better way to handle sending messages from the client to server without looping + sleeping like this?

Thanks,
- Matt

Eric Gribkoff

unread,
Jan 2, 2018, 11:48:28 AM1/2/18
to Matt Mitchell, grpc.io
If you're just wanting the client to send one request every time it receives a response, you can typically ignore #isReady() and let the automatic flow-control handle things. If you need to do something more complicated, like manually handling flow control, CallStreamObserver#setOnReadyHandler will let you register a callback when the #isReady() state changes from false to true. You can see an example using this in the manual flow control client example code, https://github.com/grpc/grpc-java/blob/c9b02db276403db4794c6e5ffc78b46889cd4ce8/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlClient.java#L70.



--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+unsubscribe@googlegroups.com.
To post to this group, send email to grp...@googlegroups.com.
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/ad2ee22e-9b55-42f5-905c-1928ea274468%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Matt Mitchell

unread,
Jan 3, 2018, 7:32:42 PM1/3/18
to grpc.io
Hi,

The way my service works is the client will establish a bidi stream. The server then sends a message, from which the client will send n-messages back. During the time when the client is streaming back messages, there can be many messages (potentially thousands), so during that time, I want to be careful to saturate the server with messages. I've been able to do this by blocking via while( ! requestStream.isReady()) { ... sleep ... } in my client's onNext() handler, then send the messages, but I notice that the CPU runs high. I tried to do something clever with having the client send all of it's messages to a BlockingQueue, and then have the onReady() handler take from the queue and send to the requestStream, but that deadlocks, because I think the the onReady callback and the client's onNext() are the same thread? So basically what I'd really like to have, is a callback that is managed by something that is not CPU intensive, and when triggered, my client can send whatever messages its accumulated over the last time isReady() was true. I hope that makes sense!

On Tuesday, January 2, 2018 at 11:48:28 AM UTC-5, Eric Gribkoff wrote:
If you're just wanting the client to send one request every time it receives a response, you can typically ignore #isReady() and let the automatic flow-control handle things. If you need to do something more complicated, like manually handling flow control, CallStreamObserver#setOnReadyHandler will let you register a callback when the #isReady() state changes from false to true. You can see an example using this in the manual flow control client example code, https://github.com/grpc/grpc-java/blob/c9b02db276403db4794c6e5ffc78b46889cd4ce8/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlClient.java#L70.


On Sun, Dec 31, 2017 at 7:07 PM, Matt Mitchell <good...@gmail.com> wrote:
Hi,

I'm using onReady() to initialize my client connection (a long lived bidi connection) and then using isReady() like:

public void onNext(MyMessage msg) {
  while( ! requestStream.isReady() ){ sleep-for-som-short-time }
  requestStream.request(1);
  requestStream.onNext(msg);
}

Which seems to work fine, but it also causes high CPU usage. Is there a better way to handle sending messages from the client to server without looping + sleeping like this?

Thanks,
- Matt

--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+u...@googlegroups.com.

matt.m...@lucidworks.com

unread,
Jan 26, 2018, 1:04:59 PM1/26/18
to grpc.io
I still haven't found a way around this. Since my client sends back a stream of results for each server request message, and the setOnReadyHandler call and onNext are on the same thread, the only way I can think of to not send a message back to the client, is to loop and wait for the requestStream to say it's ready.

In this example code, it's straight forward because the client sends requests whenever it wants to: https://github.com/grpc/grpc-java/blob/master/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlClient.java#L83

But in my case, I can only send back messages (potentially many hundreds/thousands) only when the client receives a message from the server; 1 request from the server = many responses from the client. So I'm unsure as to how to use the onReady in this case since it's disconnected from the client onNext call. So the solution I have now is to loop and sleep, but it's reducing throughput and pushing the CPIU. Is there a better way?

- Matt

Eric Anderson

unread,
Jan 26, 2018, 2:32:00 PM1/26/18
to Matt Mitchell, Matt Mitchell, grpc.io
On Wednesday, January 3, 2018 at 7:32:42 PM UTC-5, Matt Mitchell wrote:
The way my service works is the client will establish a bidi stream. The server then sends a message, from which the client will send n-messages back. 

So the "starter" code to that (which we'll need to improve), I'd expect to be closer to:
public void onNext(MyMessage msgFromServer) {
  for (MyMessage msgToServer : process(msgFromServer)) {
    while(!requestStream.isReady()) {sleep();}
    requestStream.onNext(msg);
  }
  // Doesn't actually do much; should just use autoInboundFlowControl
  requestStream.request(1);
}

Disabling auto inbound flow control (which I'm assuming is done because the request() is there) doesn't do anything here since the automatic inbound flow control calls request(1) when onNext returns (thus, you've manually implemented the "automatic" flow control).

Note that this could also be okay if you have all the response messages already (you can't push-back on the message generation):
public void onNext(MyMessage msgFromServer) {
  List<MyMessage> msgs = process(msgFromServer);
  for (MyMessage msgToServer : msgs) {
    requestStream.onNext(msg);
  }
  while(!requestStream.isReady()) {sleep();}
  // Doesn't actually do much; should just use autoInboundFlowControl
  requestStream.request(1);
}

If you already have the message, I tend to recommend you go ahead and send it. There's some exceptions to that, but they get special-case-y. The point of isReady/onReady is to slow down the message producer, so if they are already produced might as well enqueue them...

On Fri, Jan 26, 2018 at 10:04 AM, <matt.m...@lucidworks.com> wrote:
Since my client sends back a stream of results for each server request message, and the setOnReadyHandler call and onNext are on the same thread

The answer is to return from onNext so you can receive the onReady callbacks. Yes, that can be a pain, but it is an async API. (We're looking into making a blocking streaming API, which would make this sort of thing much easier. But such a thing requires effort to avoid deadlock.)

So let's look at the case where the client generates the messages all-at-once:
stub.someMethod(new ClientResponseObserver<MyReq, MyResp>() {
  private ClientCallStreamObserver<MyReq> requestStream;
  private boolean needRequest;

  @Override public void beforeStart(
      ClientCallStreamObserver<MyReq> requestStream) {
    this.requestStream = requestStream;
    requestStream.disableAutoInboundFlowControl();
    requestStream.setOnReadyHandler(() -> onReady());
  }

  private void onReady() {
    if (requestStream.isReady() && needRequest) {
      requestStream.request(1);
      needRequest = false;
    }
  }

  @Override public void onNext(MyResp msgFromServer) {
    // We can request messages from the server when flow control is ready, so it
    // should be ready now.
    List<MyMessage> msgs = process(msgFromServer);
    for (MyMessage msgToServer : msgs) {
      requestStream.onNext(msg);
    }
    if (requestStream.isReady()) {
      requestStream.request(1);
    } else {
      needRequest = true;
    }
  }
});

And now for further optimizing the client to avoid generating messages until we can send them:
stub.someMethod(new ClientResponseObserver<MyReq, MyResp>() {
  private ClientCallStreamObserver<MyReq> requestStream;
  private Iterator<MyReq> requests;

  @Override public void beforeStart(
      ClientCallStreamObserver<MyReq> requestStream) {
    this.requestStream = requestStream;
    requestStream.disableAutoInboundFlowControl();
    requestStream.setOnReadyHandler(() -> onReady());
  }

  private void onReady() {
    if (requests == null) {
      return;
    }
    while (requestStream.isReady() && requests.hasNext()) {
      // We only generate a message if we can send it
      MyReq msg = requests.next();
      requestStream.onNext(msg);
    }
    if (!requests.hasNext()) {
      requests = null;
      requestStream.request(1);
    }
  }

  @Override public void onNext(MyResp msgFromServer) {
    assert requests == null;
    requests = process(msgFromServer);
    onReady(); // no-op if not ready
  }
});
Reply all
Reply to author
Forward
0 new messages