Bi-Directional Python gRPC

656 views
Skip to first unread message

David Matson

unread,
Apr 10, 2018, 5:57:01 PM4/10/18
to grpc.io
I have a bi-directional gRPC that writes to a file and returns responses. Protobuf is defined roughly like this:

file.proto

rpc Write(stream WriteRequest) returns (stream WriteResponse);


message
FileData {
    int64 offset
= 1;
    bytes data
= 2;
}


message
WriteRequest {
    repeated
FileData extents = 1;
}


message
WriteResponse {
    int64 throttle
= 1;
}

The issue I'm running into is that I need to run special logic on the last request in the iterator before returning a response (e.g. flush buffers). The only way I can think to do that is to peek ahead in the iterator, but iterator.next() blocks forever if I haven't returned a response since the last time I called it.

file.py

def Write(self, request_iterator, context):        
    request
= request_iterator.next()
    next_request
= request


   
while True:
        request
= next_request            
       
for extent in request.extents:
           
# Write data


       
# This call blocks forever
        next_request
= request_iterator.next()            
       
if not next_request:
           
# Do important stuff before client unblocks
           
yield WriteResponse(throttle=0)
           
break
       
else:                
           
yield WriteResponse(throttle=0)

All I'm really trying to accomplish is running some logic at the very end of the RPC before the client unblocks. If I was using the Java implementation I would have access to ResponseObserver.onCompleted() but it seems like with Python, RPC completion is just implied through yield-ing the response that corresponds with the last request in the request iterator.

Is there some way to do this?

Nathaniel Manista

unread,
Apr 11, 2018, 1:04:33 AM4/11/18
to David Matson, grpc.io
On Tue, Apr 10, 2018 at 2:57 PM, David Matson <dma...@gmail.com> wrote:
I have a bi-directional gRPC that writes to a file and returns responses. Protobuf is defined roughly like this:

file.proto

rpc Write(stream WriteRequest) returns (stream WriteResponse);


message
FileData {
    int64 offset
= 1;
    bytes data
= 2;
}


message
WriteRequest {
    repeated
FileData extents = 1;
}


message
WriteResponse {
    int64 throttle
= 1;
}

The issue I'm running into is that I need to run special logic on the last request in the iterator before returning a response (e.g. flush buffers). The only way I can think to do that is to peek ahead in the iterator

Is adding a boolean "is_last_request_in_stream" field to your WriteRequest message possible? If so, would it afford the information that you need at the time that you need it?

, but iterator.next() blocks forever if I haven't returned a response since the last time I called it.

file.py

def Write(self, request_iterator, context):        
    request
= request_iterator.next()

Why request_iterator.next() rather than next(request_iterator)?


    next_request
= request


   
while True:
        request
= next_request            
       
for extent in request.extents:
           
# Write data


       
# This call blocks forever

Forever? It should only block until the remote party on the other side of the RPC either sends another request or indicates the end of the request stream.


        next_request
= request_iterator.next()

(Same question here about .next() rather than next(<...>).)


       
if not next_request:

What value assigned to next_request satisfies the condition "not next_request"?


           
# Do important stuff before client unblocks
           
yield WriteResponse(throttle=0)
           
break

How certain are you that this break statement is ever executed?


       
else:                
           
yield WriteResponse(throttle=0)

All I'm really trying to accomplish is running some logic at the very end of the RPC before the client unblocks. If I was using the Java implementation I would have access to ResponseObserver.onCompleted() but it seems like with Python, RPC completion is just implied through yield-ing the response that corresponds with the last request in the request iterator.

Is there some way to do this?

This sounds like a use case that we definitely didn't anticipate, but... it's still rather surprising that it could matter somehow whether or not something was done before the end of the stream became known versus after the end of the stream became known.

I suspect that there's something else going on.
-N

David Matson

unread,
Apr 11, 2018, 12:24:45 PM4/11/18
to grpc.io
Thanks for your response Nathaniel.

I get the same behavior with next(iterator) vs. iterator.next(). It's blocking forever because the client expects as response before it sends the next request. So when the server asks for the next request in the iterator while the client is waiting for a response, nothing is ever going to happen. This is my assumption anyway, I don't know the implementation details of the request iterator.

The issue matters for me because if I don't flush writes to disk before returning the final response the client will potentially read an incomplete file. But I don't want to flush before every response because it will hurt performance.

Your thought of adding a flag for designate a request "last" in the proto would work I was just hoping for a server-side solution.

For example, if there was some way of using callbacks like onNext and onCompleted in Java rather than the generator paradigm from the Python documentation. 

Nathaniel Manista

unread,
Apr 11, 2018, 12:51:27 PM4/11/18
to David Matson, grpc.io
On Wed, Apr 11, 2018 at 9:24 AM, David Matson <dma...@gmail.com> wrote:
I get the same behavior with next(iterator) vs. iterator.next().

This is expected with Python 2. As for Python 3: the next method isn't really supposed even be present on the iterator object, so it's poor form to call it directly. If you write iterator.next in your code today your code will break in 2020. :-P

It's blocking forever because the client expects as response before it sends the next request. So when the server asks for the next request in the iterator while the client is waiting for a response, nothing is ever going to happen. This is my assumption anyway, I don't know the implementation details of the request iterator.

The issue matters for me because if I don't flush writes to disk before returning the final response the client will potentially read an incomplete file. But I don't want to flush before every response because it will hurt performance.

Why don't you flush writes to disk after your service-side code has received indication of the request stream being closed but before closing the response stream, concluding the RPC, and returning program control?

"if I don't flush writes to disk before returning the final response the client will potentially read an incomplete file" is suspect - why does your client need a response message in the stream of response messages to know that the server wrote all necessary data to disk? Why can't the client infer from the RPC terminating normally with OK status that the server wrote all necessary data to disk?

This is a very important question that I think deserves an answer. :-)


       
else:                
           
yield WriteResponse(throttle=0)

All I'm really trying to accomplish is running some logic at the very end of the RPC before the client unblocks. If I was using the Java implementation I would have access to ResponseObserver.onCompleted() but it seems like with Python, RPC completion is just implied through yield-ing the response that corresponds with the last request in the request iterator.

Is there some way to do this?

This sounds like a use case that we definitely didn't anticipate, but... it's still rather surprising that it could matter somehow whether or not something was done before the end of the stream became known versus after the end of the stream became known.

I suspect that there's something else going on.

(I still suspect that there's something else going on.)
-Nathaniel

David Matson

unread,
Apr 11, 2018, 3:12:20 PM4/11/18
to grpc.io
Turns out if the client properly waits for the connection to close everything works as expected. You were correct that "something else is going on". Thanks for your help!
Reply all
Reply to author
Forward
0 new messages