I have a use case where the client needs to send some status updates to the server, but I also need the server to be able to notify the client if the server shuts down for any reason. My thought was bi-directional streaming makes the most sense.
The difference in my case is that I don't need to set up multiple RPC calls for this. I could theoretically hold a channel open a channel and send multiple messages fed via a queue.
Consider this abomination:
---- snip ----
import asyncio
import grpc
import bidirectional_pb2_grpc
import bidirectional_pb2
sendq = asyncio.Queue()
recvq = asyncio.Queue()
def make_message(message):
return bidirectional_pb2.Message(
message=message
)
async def generate_messages():
print('Generating')
while True:
message = await sendq.get()
msg = make_message(message)
print(f'Sending {msg.message}')
yield msg
await asyncio.sleep(3600)
async def connect(stub):
responses = stub.GetServerResponse(generate_messages())
while True:
async for response in responses:
recvq.put_nowait(response.message)
async def run():
async with grpc.aio.insecure_channel('localhost:50051') as channel:
stub = bidirectional_pb2_grpc.BidirectionalStub(channel)
asyncio.create_task(connect(stub))
sendq.put_nowait('First message')
sendq.put_nowait('Second message')
sendq.put_nowait('Third message')
sendq.put_nowait('Fourth message')
sendq.put_nowait('Fifth message')
print('Waiting for receive')
while True:
mesg = await recvq.get()
print(f'Received {mesg}')
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(run())
---- snip ----
I wasn't sure what the default timeouts are for grpc are so I added 1 hour delays between each transmission. Things seemed to work OK, but is this ill advised? I haven't seen any examples of anything like this out in the wild.
Thanks,
-Clint