I'm trying to build a Python gRPC service using bi-directional streaming as a method of sending commands from server -> client, to bypass a firewall. The client will typically respond to a command from the server by sending back another request. The first message in the stream from the client simply serves to establish the connection and then any further message in either direction could come totally out of band. Here's roughly what the proto, client, and server look like:
Proto:
service Command {
rpc Send(stream CommandMessage) returns (stream CommandMessage ) {}
}
message CommandMessage {
google.protobuf.Any task = 1;
}
Server:
def Send(self, request_iterator, context):
q = Queue()
def response_worker():
while True:
# Send commands to client out-of-band
q.put(CommandMessage(task=...)))
t = Thread(target=response_worker)
t.daemon = True
t.start()
server_response_iterator = ServerResponseIterator(q)
for request in request_iterator:
# This is where I run into problems as described below
# I want to process requests here, maybe put something on the queue in order to respond
return server_response_iterator
class ServerResponseIterator(object):
def __init__(self, queue):
self._queue = queue
def __iter__(self):
return self
def _next(self):
message = self._queue.get(block=True)
self._queue.task_done()
return message
def __next__(self): # Python 3
return self._next()
def next(self): # Python 2
return self._next()
Client:
class ClientRequestIterator(object):
def __init__(self, queue):
self._queue = queue
self._open = False
def __iter__(self):
return self
def _next(self):
if not self._open:
self._open = True
# Send a dummy message to open the connection
return CommandMessage(task=...)
else:
message = self._queue.get(block=True)
self._queue.task_done()
return message
def __next__(self): # Python 3
return self._next()
def next(self): # Python 2
return self._next()
if __name__ == '__main__':
with grpc.insecure_channel('localhost:50051') as channel:
stub = command_pb2_grpc.CommandStub(channel)
q = Queue()
def request_worker():
while True:
# Send commands to server out-of-band
time.sleep(15)
q.put(CommandMessage(task=...)
t = Thread(target=request_worker)
t.daemon = True
t.start()
request_iterator = ClientRequestIterator(q)
responses = stub.Send(request_iterator)
for response in responses:
# Respond to commands from server
q.put(CommandMessage(task=...)
The behavior I'm seeing is the client side works. It will send out of band message that come from the background thread that I put on the queue. It also properly consumes messages from the response iterator (commands from the server), puts a command on the queue and then sends that request. However, on the server side of things the server will only see the initial request that opens the connections and then it will never see another request (it seems hung in the request iterator). As the server puts out of band messages on the queue it does send responses every 15 seconds (from the response iterator).
So I'm open to the possibility that I'm going about this in entirely the wrong way. Conceptually I think what I want is: 1) a server side thread that can send responses at any time 2) a server side thread that will process requests as they come in, and may or may not send a response 3) a client side thread that can send requests at any time 4) a client side thread that will process responses as they come in and may or may not send a request as a result.
I'm making the assumption that the server doesn't have to send a response for every request it receives, and conversely could send a response without every having received a request.
Thanks in advance for any feedback!