Hi,
I have a use case where multiple clients connect to a server. A call from a client to the server might require the server to call another client. For some functionality a direct call from one client to another would be best. For performance reasons some clients will be written in C++. For simplicity the server and other clients will be written in Python. Is this possible? In my attempt below the client call never returns. In the example client calls manager which calls provider.
poc.capnp
@0xdb7253e3ef44cbf9;
interface Manager {
registerService @0 (service :Service) -> ();
getService @1 () -> (service :Service);
get @2 () -> (val :UInt16);
interface Service {
get @0 () -> (val :UInt16);
}
}
client.py
#!/usr/bin/python3
import sys
import capnp
import poc_capnp
def start_client(host):
client = capnp.TwoPartyClient(host)
ms = client.bootstrap().cast_as(poc_capnp.Manager)
print(ms.get().wait())
service = ms.getService().wait()
print('got service')
print(service.get().wait())
start_client(sys.argv[1])
#!/usr/bin/python3
import sys
import capnp
import poc_capnp
import asyncio
class Provider(poc_capnp.Manager.Service.Server):
def __init__(self):
self.counter = 0
def get(self, **kwargs):
print(self.counter)
self.counter = (self.counter + 1) % 10000
return self.counter
async def socket_reader(client, reader):
while True:
try:
data = await asyncio.wait_for(reader.read(4096), timeout=1.0)
client.write(data)
except asyncio.TimeoutError:
pass
async def socket_writer(client, writer):
while True:
try:
data = await asyncio.wait_for(client.read(4096), timeout=1.0)
await writer.write(data.tobytes())
except asyncio.TimeoutError:
pass
async def start_ipc(host):
client = capnp.TwoPartyClient()
ms = client.bootstrap().cast_as(poc_capnp.Manager)
reader, writer = await asyncio.wait_for(asyncio.open_connection(*host.split(':')), timeout=1.0, )
coroutines = [socket_reader(client, reader), socket_writer(client, writer)]
asyncio.gather(*coroutines, return_exceptions=True)
await ms.registerService(Provider()).a_wait()
while True:
await asyncio.sleep(1)
asyncio.run(start_ipc(sys.argv[1]))
manager.py
#!/usr/bin/python3
import sys
import capnp
import poc_capnp
import asyncio
g_service = None
class ManagerImpl(poc_capnp.Manager.Server):
def __init__(self):
pass
def registerService(self, service, **kwargs):
global g_service
g_service = service
print('service registered')
def getService(self, **kwargs):
global g_service
print('service retrieved')
return g_service
def get(self, _context, **kwargs):
global g_service
print('service called')
return g_service.get().then(
lambda value: setattr(_context.results, "val", value)
)
# -------------------------------------------------------
class Server:
async def myreader(self):
while self.retry:
try:
# Must be a wait_for so we don't block on read()
data = await asyncio.wait_for(
self.reader.read(4096),
timeout=0.1
)
except asyncio.TimeoutError:
# print("myreader timeout.")
continue
except Exception as err:
print("Unknown myreader err: %s", err)
return False
await self.server.write(data)
print("myreader done.")
return True
async def mywriter(self):
while self.retry:
try:
# Must be a wait_for so we don't block on read()
data = await asyncio.wait_for(
self.server.read(4096),
timeout=0.1
)
self.writer.write(data.tobytes())
except asyncio.TimeoutError:
# print("mywriter timeout.")
continue
except Exception as err:
print("Unknown mywriter err: %s", err)
return False
print("mywriter done.")
return True
async def myserver(self, reader, writer):
# Start TwoPartyServer using TwoWayPipe (only requires bootstrap)
self.server = capnp.TwoPartyServer(bootstrap=ManagerImpl())
self.reader = reader
self.writer = writer
self.retry = True
# Assemble reader and writer tasks, run in the background
coroutines = [self.myreader(), self.mywriter()]
tasks = asyncio.gather(*coroutines, return_exceptions=True)
while True:
self.server.poll_once()
# Check to see if reader has been sent an eof (disconnect)
if self.reader.at_eof():
self.retry = False
print('close connection')
break
await asyncio.sleep(0.01)
# Make wait for reader/writer to finish (prevent possible resource leaks)
await tasks
async def new_connection(reader, writer):
print('new connection')
server = Server()
await server.myserver(reader, writer)
async def run_server(host):
server = await asyncio.start_server(new_connection, *host.split(':'))
print('server started')
async with server:
await server.serve_forever()
asyncio.run(run_server(sys.argv[1]))