Canonical duplex async rpc example?

14 views
Skip to first unread message

Matt Billenstein

unread,
Jun 9, 2023, 12:51:04 AM6/9/23
to gev...@googlegroups.com
Hello, does anyone have an example of this or could audit something I've put
together?

I have a StreamServer that accepts connections from a client and then does
duplex async msgpack rpc - the server can call the client and the client can
call the server. Messages can return responses out of order.

Only one greenlet reads from the socket and only one writes - I serialize
writes using a Queue and I've ended up wrapping all the socket read/sends in
gevent.Timeouts -- ssl sockets or perhaps sockets in general can just seem to
hang forever. I also have a greenlet on the client that sends pings over the
socket every 5s, if I don't see a pong quickly enough in the client, I consider
the connection dead and reconnect.

https://github.com/mattbillenstein/salty/blob/master/salty.py#L114

This works, but seems complex - if anyone knows a more canonical implementation
of this or has an pointers - thanks in advance!

m

--
Matt Billenstein
ma...@vazor.com
https://vazor.com

Aleksandar Kordic

unread,
Jun 9, 2023, 6:02:09 AM6/9/23
to gev...@googlegroups.com
Hello Matt,

You are on the right track. To address code complexity i would apply refactor methods from this article https://www.fluentcpp.com/2016/12/15/respect-levels-of-abstraction/ .  I don't see much wrong with your gevent usage, except to be more pedantic in your final code.
 
Only one greenlet reads from the socket and only one writes

Good rule, but your ping & pong logic violate this rule. 
  
ssl sockets or perhaps sockets in general can just seem to
hang forever.
 
When writing gevent code using standard io functions is forbidden. The `gevent.monkey.patch_all()` is not reliable. In practice it takes less time to refactor existing code to use gevent.* compared to hunting what causes coroutine starvation.

For example using [sleep() from time](https://github.com/mattbillenstein/salty/blob/9e399eafd6e8eeafc60a112691fb8d8fda0d4b36/salty.py#LL95C26-L95C26) instead of [gevent.sleep](https://www.gevent.org/api/gevent.html#sleeping) won't raise an exception, the bug is subtle: entire gevent machinery pauses for 5 seconds. In this case `monkey.patch_all()` fixes sleep here to use `gevent.sleep` but if you are noticing `sockets can just seem to hang forever` then somewhere in the code is standard io that blocks the entire gevent hub.

Alex

Matt Billenstein

unread,
Jun 9, 2023, 11:58:00 AM6/9/23
to gev...@googlegroups.com
Hello Alex, thanks for taking a look.

On Fri, Jun 09, 2023 at 11:58:24AM +0200, Aleksandar Kordic wrote:
>
> Only one greenlet reads from the socket and only one writes
>
>
> Good rule, but your ping & pong logic violate this rule. 

I think you'll notice that function and the handlers write to the Queue, not to
the socket; only the greenlet _writer writes to the socket. Only the handle
function itself reads.

> ssl sockets or perhaps sockets in general can just seem to
> hang forever.
>
>  
> When writing gevent code using standard io functions is forbidden. The
> `gevent.monkey.patch_all()` is not reliable. In practice it takes less time to
> refactor existing code to use gevent.* compared to hunting what causes
> coroutine starvation.

This is news to me - I thought patching was the best way to ensure you were
patching code you don't own in libraries and so forth?

thx
Reply all
Reply to author
Forward
0 new messages