How to cancel a server-to-client stream?

4,849 views
Skip to first unread message

bean...@googlemail.com

unread,
Mar 21, 2015, 7:26:21 PM3/21/15
to grp...@googlegroups.com
It looks at the moment like the way to interact with a server-to-client stream is either with a BlockingStub or with just a Stub which I see is used for async operations.

How can the client cancel a stream from the server part way through when using the async stub?

I am thinking of using a server-to-client stream to return search results. Instead of having to bother with pagination, I want to return a stream, and let the client display as many items as the user wants to see as they scroll through the list, and terminate the stream when they click on an item. Obviously, in this case the BlockingStub is no use because I don't want to load a potentially large dataset into memory first.

The async Stub implementation looks almost useful, but it's not quite clear to me what's going on. I can see that a new StreamObserverToCallListenerAdapter is created which has both the `call` and `observer` instances in it, but I can't quite unpick the listener to see whether it's possible to get hold of the `call` object.

So, how can I cancel a server-to-client stream from the client? Can I get hold of the `call` object, and then invoke `call.cancel()` when I want, or can I just call `streamObserver.onCompleted()` whenever I want instead?

Thanks in advance.

Eric Anderson

unread,
Mar 22, 2015, 6:46:39 PM3/22/15
to bean...@googlemail.com, grpc-io
On Sat, Mar 21, 2015 at 4:26 PM, <bean...@googlemail.com> wrote:
It looks at the moment like the way to interact with a server-to-client stream is either with a BlockingStub or with just a Stub which I see is used for async operations.

How can the client cancel a stream from the server part way through when using the async stub?

For single requests, you can't. For bi-directional streaming you can call onError if you have not already called onCompleted. The API is simply too simple for this need; this problem would be solved by using more reactive-streams than RxJava, but that has its own issues.

So my suggested solution is to use Call directly. You can use YourServiceGrpc.CONFIG.yourMethod to access a code-generated MethodDescriptor, which you can then pass to Channel.newCall. It shouldn't be much additional effort.

I am thinking of using a server-to-client stream to return search results. Instead of having to bother with pagination, I want to return a stream, and let the client display as many items as the user wants to see as they scroll through the list, and terminate the stream when they click on an item.

Oh, that makes sense. And flow control nicely pushes back on the server when the user hasn't needed to see more.

Obviously, in this case the BlockingStub is no use because I don't want to load a potentially large dataset into memory first.

BlockingStub only consumes an additional Thread. Otherwise it would have the same memory usage as async Stub (or better, because outbound flow control can be integrated (once we implement it...)). Calling next() on the iterator actually requests an additional message, for a future call to next(), via flow control.

So, how can I cancel a server-to-client stream from the client? Can I get hold of the `call` object, and then invoke `call.cancel()` when I want, or can I just call `streamObserver.onCompleted()` whenever I want instead?

Both great questions. I think I answered them above.

bean...@googlemail.com

unread,
Mar 24, 2015, 5:53:22 AM3/24/15
to grp...@googlegroups.com, bean...@googlemail.com
Thanks for your reply Eric (not sure why I have to reply to my own message instead of yours here...), it took me a little while to understand because of how I phrased my question.

If I rephrased my question as "Should I use an async stub or a blocking stub for search results?", you're saying I should use a blocking stub and just return the iterator, which will cause the underlying channel to request the next result from the stream as it's consumed? That would be what I'm looking for.

I think you're also suggesting that I call the following directly in my client code instead of using the generated method, so I have access to the 'call' object:

Call call = channel.newCall(config.listEvents)
blockingServerStreamingCall
(
call, request);
}

If the user wanted to run a different query I could invoke `call.cancel()` before making a new call.

Assuming I've understood correctly, thanks for your help.

Eric Anderson

unread,
Mar 24, 2015, 10:26:41 AM3/24/15
to bean...@googlemail.com, grpc-io
On Tue, Mar 24, 2015 at 2:53 AM, <bean...@googlemail.com> wrote:
If I rephrased my question as "Should I use an async stub or a blocking stub for search results?", you're saying I should use a blocking stub and just return the iterator, which will cause the underlying channel to request the next result from the stream as it's consumed? That would be what I'm looking for.

Yes. Blocking does everything you need, it seems. Cancellation with BlockingStub is done via Thread interruption, FYI.

I think you're also suggesting that I call the following directly in my client code instead of using the generated method, so I have access to the 'call' object:

I am suggesting that if blocking doesn't work for you (for whatever reason), and you really want async, then using Call directly is a fine (and supported) way of being able to use an Async API while also being able to cancel.
If the user wanted to run a different query I could invoke `call.cancel()` before making a new call.
Yes.

bean...@googlemail.com

unread,
Mar 25, 2015, 8:07:58 AM3/25/15
to grp...@googlegroups.com, bean...@googlemail.com
Got it, thanks!

Eric Anderson

unread,
Apr 6, 2015, 6:14:23 PM4/6/15
to bean...@googlemail.com, grpc-io
On Sun, Mar 22, 2015 at 3:46 PM, Eric Anderson <ej...@google.com> wrote:
On Sat, Mar 21, 2015 at 4:26 PM, <bean...@googlemail.com> wrote:
I am thinking of using a server-to-client stream to return search results. Instead of having to bother with pagination, I want to return a stream, and let the client display as many items as the user wants to see as they scroll through the list, and terminate the stream when they click on an item.

Oh, that makes sense. And flow control nicely pushes back on the server when the user hasn't needed to see more.

I got to thinking that you should know one more thing: flow control will push back nicely, but the server will generate more responses than necessary before flow control kicks in. Right now we buffer up to 64 KB and on high latency, high bandwidth links (which is uncommon for end users but may be common in data centers) we will need to buffer even more. If the responses are small, then there may be many that are buffered and end up not being used.

Abhishek Parmar

unread,
Apr 17, 2015, 4:04:56 AM4/17/15
to Eric Anderson, bean...@googlemail.com, grpc-io
What is the right way to do the same thing (cancel a streaming server rpc) from a sync c++ client.

ClientContext::TryCancel

followed by 

ClientReader::Finish

results in this failed assertion (I am using grpc code from githash c637cc922fa5)

E0417 00:58:06.489979859    9215 channel.cc:78] assertion failed: GRPC_CALL_OK == grpc_call_start_batch(call->call(), ops, nops, buf)                                                                    *** Aborted at 1429257486 (unix time) try "date -d @1429257486" if you are using GNU date ***                                                                                                              

PC: @     0x7f9a849e0cc9 (unknown)                                                                                                                                                                         

*** SIGABRT (@0x3e8000023ff) received by PID 9215 (TID 0x7f9a85cf07c0) from PID 9215; stack trace: ***                                                                                                     

    @     0x7f9a858d4340 (unknown)                                                                                                                                                                         

    @     0x7f9a849e0cc9 (unknown)                                                                                                                                                                         

    @     0x7f9a849e40d8 (unknown)                                                                                                                                                                         

    @      0x4c843f grpc::Channel::PerformOpsOnCall()                                                                                                                                                 

    @       0x4119ab grpc::ClientReader<>::Finish()     


--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+u...@googlegroups.com.
To post to this group, send email to grp...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/CA%2B4M1oO9JwwEV6nUYM5N-zdeZuzpdBqTrd1SNEe4K9snWMWf6w%40mail.gmail.com.

For more options, visit https://groups.google.com/d/optout.



--
-Abhishek

Abhishek Kumar

unread,
Apr 17, 2015, 1:22:33 PM4/17/15
to Abhishek Parmar, Eric Anderson, bean...@googlemail.com, grpc-io
Even when you Cancel at the client, you should continue reading till ClientReader::Read() returns false. Only then is it correct to call ClientReader::Finish().

The above sequence is required to deal with server to client messages that race with the cancellation.

Abhishek Parmar

unread,
Apr 17, 2015, 2:44:04 PM4/17/15
to Abhishek Kumar, Eric Anderson, bean...@googlemail.com, grpc-io
That still seems to fail the same way for me. Are there any examples of cancellation that I can try?
--
-Abhishek

Abhishek Kumar

unread,
Apr 17, 2015, 5:35:26 PM4/17/15
to Abhishek Parmar, Eric Anderson, bean...@googlemail.com, grpc-io
So I tried to add a test and had mixed results. 

For server streaming and bidi streams, there are passing tests in this PR: https://github.com/grpc/grpc/pull/1307

For client streaming RPC, I did find a crash. Corresponding PR is: https://github.com/grpc/grpc/pull/1308 and ctiller is fixing the underlying issue in C core.

Coming back to your crash though - if it is in server streaming, then I do not see it in my test. Perhaps you can help by crafting a test case that recreates the crash.

-Abhishek

Abhishek Parmar

unread,
Apr 17, 2015, 6:08:48 PM4/17/15
to Abhishek Kumar, Eric Anderson, bean...@googlemail.com, grpc-io
Will do. Perhaps I am at an older version of the code. I will try upgrading next week.
--
-Abhishek

Abhishek Parmar

unread,
Apr 19, 2015, 10:07:40 PM4/19/15
to Abhishek Kumar, Eric Anderson, bean...@googlemail.com, grpc-io
I could not reproduce this after pulling from the master.  Thanks for looking into it.
--
-Abhishek

Reply all
Reply to author
Forward
0 new messages