[grpc-java] How to close stream by client for server-side streaming

5,925 views
Skip to first unread message

Edmund

unread,
Oct 19, 2016, 8:08:45 PM10/19/16
to grpc.io
Hello!

I am trying to close a server-side streaming stream by the client but cannot seem to find a way to do it without closing the whole connection or channel. The proto file has a service like this: 

rpc ListFeatures(Rectangle) returns (stream Feature) {}

When I use stub.ListFeatures() it does not return a StreamObserver like a bi-directional stream. So I cannot call onComplete() and have the stream close without closing the channel.

Any thoughts?

* I also tried using a blockingStub, loop and flag. However, it would get stuck in hasNext()when waiting for messages and I cannot break out with a flag

Robert Bielik

unread,
Oct 21, 2016, 6:28:39 AM10/21/16
to grpc.io, edfu...@gmail.com
Hi,

stub.ListFeatures(..) will return an object for which you can get the streamed features. When you destroy that object, the servers "write" object will fail when doing writeObj.Write(Feature). That way the server knows the connection
has been severed.

Regards
/R

Robert Bielik

unread,
Oct 21, 2016, 6:29:29 AM10/21/16
to grpc.io, edfu...@gmail.com
Btw, I use this mechanism for streaming audio packets and it works nicely.

Eric Anderson

unread,
Oct 31, 2016, 12:33:39 PM10/31/16
to Edmund, grpc.io
Blocking stub observers thread interruption; if the thread is interrupted it will cancel the RPC.

For async stub, you can create a CancellableContext and cancel it:

CancellableContext withCancellation = Context.current().withCancellation();
withCancellation.run(new Runnable() {
  @Override public void run() {
    stub.listFeatures(rectangle, observer);
  }
});

When the RPC completes, make sure to call withCancellation.cancel(null);, otherwise you can start retaining memory.

When you want to cancel the RPC call withCancellation.cancel(someEx);.  The exception is just used for your own debugging, since without it it would be hard to tell where cancellations come from.

--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+unsubscribe@googlegroups.com.
To post to this group, send email to grp...@googlegroups.com.
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/31ef938c-f3c1-4a88-a73c-f6f0ae15e1c2%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

edfu...@gmail.com

unread,
Nov 2, 2016, 7:30:35 PM11/2/16
to grpc.io, edfu...@gmail.com
Awesome, thanks! That was what I was looking for.


On Monday, October 31, 2016 at 9:33:39 AM UTC-7, Eric Anderson wrote:
Blocking stub observers thread interruption; if the thread is interrupted it will cancel the RPC.

For async stub, you can create a CancellableContext and cancel it:

CancellableContext withCancellation = Context.current().withCancellation();
withCancellation.run(new Runnable() {
  @Override public void run() {
    stub.listFeatures(rectangle, observer);
  }
});

When the RPC completes, make sure to call withCancellation.cancel(null);, otherwise you can start retaining memory.

When you want to cancel the RPC call withCancellation.cancel(someEx);.  The exception is just used for your own debugging, since without it it would be hard to tell where cancellations come from.
On Wed, Oct 19, 2016 at 5:08 PM, Edmund <edfu...@gmail.com> wrote:
Hello!

I am trying to close a server-side streaming stream by the client but cannot seem to find a way to do it without closing the whole connection or channel. The proto file has a service like this: 

rpc ListFeatures(Rectangle) returns (stream Feature) {}

When I use stub.ListFeatures() it does not return a StreamObserver like a bi-directional stream. So I cannot call onComplete() and have the stream close without closing the channel.

Any thoughts?

* I also tried using a blockingStub, loop and flag. However, it would get stuck in hasNext()when waiting for messages and I cannot break out with a flag

--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+u...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages