[python] Cannot cancel server stream response from client side when there is a client-side interceptor.

50 views
Skip to first unread message

miketw...@gmail.com

unread,
Jan 2, 2022, 11:23:17 AM1/2/22
to grpc.io
Hi, I develop a desktop application that allows transmission of files between computers on a local network.

A recent new feature I added was to add compression during the transfer. This seemed like an ideal candidate for using interceptors to handle compression/decompression which would handle the stream transparently (and allow simple passthru when both clients didn't support or want to use compression).

The problem I've run into is when the client (receiver) wants to cancel the transfer. Prior to adding the intercept layer, I simply called reponse.cancel() which would raise a cancel exception on the server.

With the addition of my client-side interceptor (to decompress the stream just before the 'real' client receives it), I'm returning a simple generator instead, which is not a Future like the original response. What I'm doing at the moment is returning both the new generator as well as the original response (future) as a 'cancellable' object. This is ugly because now the stub method 'StartTransfer' no longer returns a single object like the proto definition declares:
...
rpc StartTransfer(OpInfo) returns (stream FileChunk) {}
...
but I'm actually calling this:
...
op.file_iter_cancellable, op.file_iterator = self.stub.StartTransfer( ...
...

Here is the client interceptor to show my current workaround:

class ChunkDecompressor(grpc.UnaryStreamClientInterceptor):
    def __init__(self):
        pass

    # Intercept the RPC response after it returns from the server but before it reaches
    # the remote.
    def intercept_unary_stream(self, continuation, client_call_details, request):
        # Only intercept transfer ops.
        if client_call_details.method != "/Warp/StartTransfer":
            return continuation(client_call_details, request)

        try:
            use_comp = request.use_compression
        except AttributeError:
            use_comp = False

        logging.debug("Transfer using compression: %d" % use_comp)

        # When always need to return the original response along with
        # whatever the remote will be iterating over. If there's no
        # compression, we just return the same response twice.
        if not use_comp:
            response = continuation(client_call_details, request)
            return response, response

        def decomp_stream(response):
            # Inbound response (returned from continuation())
            for chunk in response:
                try:
                    if not chunk.chunk:
                        yield chunk
                    else:
                        dcchunk = zlib.decompress(chunk.chunk)
                        chunk.chunk = dcchunk
                        yield chunk
                except Exception as e:
                    logging.warning("Decompression error: %s" % e)
                    # this will go to remote.start_transfer_op()'s handler.
                    raise

        response = continuation(client_call_details, request)

        # With compression, decomp_stream returns a simple generator
        # function. The response is still needed for its future.cancel()
        # method.

        return response, decomp_stream(response)

Is there some other way I can accomplish this? I'm fine keeping it this way - this is not any sort of public api. I just have a low-level discomfort about my workaround and that usually means I'm doing it wrong :)

Thanks!

miketw...@gmail.com

unread,
Jan 2, 2022, 8:38:35 PM1/2/22
to grpc.io
I realized the I can create a generator class to wrap the origin response and expose a cancel() method as well. I noticed the documentation didn't seem to be all that strict about intercept return types.

Thanks for the help! :)

miketw...@gmail.com

unread,
Jan 2, 2022, 9:36:27 PM1/2/22
to grpc.io
For completeness, I ended up with this class to replace the original response:

class StreamReponseWrapper():
    def __init__(self, response):
        self.response = response

    def __iter__(self):
        return self

    def __next__(self):
        chunk = self.response.__next__()

        try:
            if not chunk.chunk:
                return chunk

            else:
                dcchunk = zlib.decompress(chunk.chunk)
                chunk.chunk = dcchunk
                return chunk

        except Exception as e:
            logging.warning("Decompression error: %s" % e)
            # this will go to remote.start_transfer_op()'s handler.
            raise

    # Future
    def cancel(self):
        self.response.cancel()
Reply all
Reply to author
Forward
0 new messages