Python: Independent server and client request/response cycle (bi-directional streaming)

42 views
Skip to first unread message

David Matson

unread,
Oct 18, 2018, 1:09:24 PM10/18/18
to grpc.io
I'm trying to build a Python gRPC service using bi-directional streaming as a method of sending commands from server -> client, to bypass a firewall. The client will typically respond to a command from the server by sending back another request. The first message in the stream from the client simply serves to establish the connection and then any further message in either direction could come totally out of band. Here's roughly what the proto, client, and server look like:

Proto:

service Command {
    rpc
Send(stream CommandMessage) returns (stream CommandMessage ) {}
}

message
CommandMessage {
    google
.protobuf.Any task = 1;
}

Server:
   
def Send(self, request_iterator, context):
    q
= Queue()

   
def response_worker():
       
while True:
           
# Send commands to client out-of-band
            q
.put(CommandMessage(task=...)))

    t
= Thread(target=response_worker)
    t
.daemon = True
    t
.start()

    server_response_iterator
= ServerResponseIterator(q)
   
for request in request_iterator:
       
# This is where I run into problems as described below
       
# I want to process requests here, maybe put something on the queue in order to respond
       
return server_response_iterator

class ServerResponseIterator(object):
   
def __init__(self, queue):
       
self._queue = queue

   
def __iter__(self):
       
return self

   
def _next(self):
        message
= self._queue.get(block=True)
       
self._queue.task_done()
       
return message

   
def __next__(self):  # Python 3
       
return self._next()

   
def next(self):  # Python 2
       
return self._next()

Client:

class ClientRequestIterator(object):
   
def __init__(self, queue):
       
self._queue = queue
       
self._open = False

   
def __iter__(self):
       
return self

   
def _next(self):
       
if not self._open:
           
self._open = True
           
# Send a dummy message to open the connection
           
return CommandMessage(task=...)
       
else:
            message
= self._queue.get(block=True)
           
self._queue.task_done()
           
return message

   
def __next__(self):  # Python 3
       
return self._next()

   
def next(self):  # Python 2
       
return self._next()

if __name__ == '__main__':
   
with grpc.insecure_channel('localhost:50051') as channel:
        stub
= command_pb2_grpc.CommandStub(channel)
       
        q
= Queue()

       
def request_worker():
           
while True:
               
# Send commands to server out-of-band
                time
.sleep(15)                
                q
.put(CommandMessage(task=...)
               
        t
= Thread(target=request_worker)
        t
.daemon = True
        t
.start()

        request_iterator
= ClientRequestIterator(q)
        responses
= stub.Send(request_iterator)
       
for response in responses:      
           
# Respond to commands from server
            q
.put(CommandMessage(task=...)          


The behavior I'm seeing is the client side works. It will send out of band message that come from the background thread that I put on the queue. It also properly consumes messages from the response iterator (commands from the server), puts a command on the queue and then sends that request. However, on the server side of things the server will only see the initial request that opens the connections and then it will never see another request (it seems hung in the request iterator). As the server puts out of band messages on the queue it does send responses every 15 seconds (from the response iterator).

So I'm open to the possibility that I'm going about this in entirely the wrong way. Conceptually I think what I want is: 1) a server side thread that can send responses at any time 2) a server side thread that will process requests as they come in, and may or may not send a response 3) a client side thread that can send requests at any time 4) a client side thread that will process responses as they come in and may or may not send a request as a result.

I'm making the assumption that the server doesn't have to send a response for every request it receives, and conversely could send a response without every having received a request.

Thanks in advance for any feedback!

li...@google.com

unread,
Oct 30, 2018, 9:10:34 PM10/30/18
to grpc.io
Hi David,

For bi-directional streaming, you are right that the server doesn't have to send a response for every request. Actually, the server can even send response before it ever received a request. Both client & server can send message at anytime they want to, since the message transmitting is full-duplex thanks to HTTP/2.0.
So, gRPC is able to achieve the requirement you want, the critical part is the blocking logic. And its implementation highly depended on your application logic.

Lidi Zheng
Reply all
Reply to author
Forward
0 new messages