async def MethodWithStreamingReplies(request, reply_stream):
while some_condition:
item = await some_coroutine()
reply_stream.push(item)
reply_stream.close()
Is the idea to provide asynchronous / async-friendly APIs on the stub, service, or both?
Based on my experience, I think designing an asynchronous api for rpc services is probably trickier, particularly for streaming replies. The simple and developer-user friendly generator idiom for streaming replies doesn't translate to asyncio coroutines, which can only return once. Coroutine generators are not implemented in python 3.5, see https://www.python.org/dev/peps/pep-0492/#coroutine-generators
What I think this means is that an asynchronous rpc method with a streaming reply would have to push elements onto a queue or other object representing a stream, rather than returning them directly. Something like the following:
async def MethodWithStreamingReplies(request, reply_stream):
while some_condition:
item = await some_coroutine()
reply_stream.push(item)
reply_stream.close()
async def invoke_unary_stream(method, request, reply_stream):
await method(request, reply_stream)
if not reply_stream.closed():
reply_stream.close()
def handle_unary_stream_request(method, request) -> Tuple[Future, ReplyStream]:
reply_stream = ReplyStream()
return asyncio.ensure_future(invoke_unary_stream(method, request, reply_stream)), reply_stream
Nathaniel: That looks like an example of a method implementation.
The stream does not have to be closed explicitly by the implementer (though it should be possible to do so). The stream should always close when the method returns. The server logic would look something like this:
async def invoke_unary_stream(method, request, reply_stream):
await method(request, reply_stream)
if not reply_stream.closed():
reply_stream.close()
def handle_unary_stream_request(method, request) -> Tuple[Future, ReplyStream]:
reply_stream = ReplyStream()
return asyncio.ensure_future(invoke_unary_stream(method, request, reply_stream)), reply_streamThe reply stream then has to be consumed by either a different coroutine or a thread that is responsible for sending response messages.I have written a simple prototype of an asyncio gRPC implementation in hyper-h2 (http://python-hyper.org/projects/h2/en/stable/). I'll be happy to put that on GitHub if anyone is interested.
I don't think it is particularly difficult to get gRPC working with asyncio in principle. The main issue that needs to be addressed with the official gRPC implementation is that there is a thread for each method call
and messages related to a particular RPC call are sent/received in a blocking manner
in the same thread where the method call is happening.
It is straightforward to make coroutine method implementation today, but the exercise is pointless because each RPC call has its own thread. There should be some dedicated threads for receiving and sending messages
and then a dedicated thread for the event loop. Then handling a request is just a matter of invoking the method on the event loop and passing any streamed messages between the threads (using a queue or some other mechanism).
On 08 Feb 2017, at 01:40, claw...@gmail.com wrote:
A H2 based grpc implementation sounds interesting. Did you get around to posting your H2 based experiment to GitHub? If so, could you provide a link?
--
You received this message because you are subscribed to a topic in the Google Groups "grpc.io" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/grpc-io/RpkyqqQy8TU/unsubscribe.
To unsubscribe from this group and all its topics, send an email to grpc-io+u...@googlegroups.com.
To post to this group, send email to grp...@googlegroups.com.
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/62114d88-a2db-46d4-9d6b-fec82a7335e3%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
On Sun, Sep 25, 2016 at 8:27 AM, <la...@lyschoening.de> wrote:Nathaniel: That looks like an example of a method implementation.I think I'm so used to seeing the messages be protocol buffer objects that I was a simple "item" threw me off. :-PThe stream does not have to be closed explicitly by the implementer (though it should be possible to do so). The stream should always close when the method returns. The server logic would look something like this:
async def invoke_unary_stream(method, request, reply_stream):
await method(request, reply_stream)
if not reply_stream.closed():
reply_stream.close()
def handle_unary_stream_request(method, request) -> Tuple[Future, ReplyStream]:
reply_stream = ReplyStream()
return asyncio.ensure_future(invoke_unary_stream(method, request, reply_stream)), reply_streamThe reply stream then has to be consumed by either a different coroutine or a thread that is responsible for sending response messages.I have written a simple prototype of an asyncio gRPC implementation in hyper-h2 (http://python-hyper.org/projects/h2/en/stable/). I'll be happy to put that on GitHub if anyone is interested.Sure!I don't think it is particularly difficult to get gRPC working with asyncio in principle. The main issue that needs to be addressed with the official gRPC implementation is that there is a thread for each method callAgreed.
and messages related to a particular RPC call are sent/received in a blocking mannerAgreed.
in the same thread where the method call is happening.Are you sure? I think the thread in which the method call is happening is made to wait while the server's own thread sends and receives messages on the wire.It is straightforward to make coroutine method implementation today, but the exercise is pointless because each RPC call has its own thread. There should be some dedicated threads for receiving and sending messages
and then a dedicated thread for the event loop. Then handling a request is just a matter of invoking the method on the event loop and passing any streamed messages between the threads (using a queue or some other mechanism).Sounds reasonable.-N