Terminate grpc stream

521 views
Skip to first unread message

suvh...@gmail.com

unread,
Oct 17, 2017, 5:27:32 AM10/17/17
to grpc.io
Hello,

 I have a use case to have a stream continuously opened between server and the client. The server uses the stream to push data to the client. However, we have certain scenarios wherein we might need to terminate the stream. I tried using the following wherein I could find that the client gets closed but the server still continues to push the data. 

CancellableContext withCancellation = Context.current().withCancellation();
withCancellation.run(new Runnable() {
@Override
public void run() {
nonblockingStub.listFeatures(request, responseObserver);
}
});

Thread.sleep(15000);
withCancellation.cancel(null);
Thread.sleep(15000);

Also would like to know if there is an option for an idle timeout for a stream wherein if there are no traffic on the stream for a specified duration the stream gets terminated something similar to keepAlive for ManagedChannelBuilder.

Note: We are having a server side streaming only.


Waiting eagerly for your replies. :)

Regards
Suv


Ryan Michela

unread,
Oct 17, 2017, 5:58:33 PM10/17/17
to grpc.io

Something like this might work (untested):

public abstract class CancelableClientCallStreamObserver<TResp>
       
extends ClientCallStreamObserver<TResp>
       
implements ClientResponseObserver<Object, TResp> {


   
private ClientCallStreamObserver requestStream;


   
@Override
   
public void cancel(@Nullable String message, @Nullable Throwable cause) {
       
if (requestStream != null) {
            requestStream
.cancel(message, cause);
       
}
   
}


   
@Override
   
public void beforeStart(ClientCallStreamObserver requestStream) {
       
this.requestStream = requestStream;
   
}
}

You would use it like this:

CancelableClientCallStreamObserver<ResponseType> responseObserver = new CancelableClientCallStreamObserver<>() {
 
@Override
 
public void onNext(TResp value) {
   
...
 
}

 
@Override
 
public void onError(Throwable t) {
   
...
 
}

 
@Override
 
public void onCompleted() {
   
...
 
}
}

nonblockingStub
.listFeatures(request, responseObserver);

Thread.sleep(15000);
responseObserver.cancel();

suvhrajit basak

unread,
Oct 19, 2017, 1:20:01 AM10/19/17
to Ryan Michela, grpc.io
H Ryan,

Thank you for your prompt response. I have a doubt about the example you have cited. I believe that the "beforeStart" method needs to be call and needs a request stream. In my case I don't have a request stream as it's a server side streaming. Pls. do let me know if there any alternate ways to achieve our use case.

Thanks,

Suvhrajit Basak

--
You received this message because you are subscribed to a topic in the Google Groups "grpc.io" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/grpc-io/mC9Vm3fRnNU/unsubscribe.
To unsubscribe from this group and all its topics, send an email to grpc-io+unsubscribe@googlegroups.com.
To post to this group, send email to grp...@googlegroups.com.
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/413af003-5fd8-49f8-b779-b0bbb5951ab7%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Ryan Michela

unread,
Oct 19, 2017, 12:28:26 PM10/19/17
to grpc.io
To unsubscribe from this group and all its topics, send an email to grpc-io+u...@googlegroups.com.

To post to this group, send email to grp...@googlegroups.com.
Visit this group at https://groups.google.com/group/grpc-io.
Reply all
Reply to author
Forward
0 new messages