Hi,
I have a use case where multiple clients connect to a server. A call from a client to the server might require the server to call another client. For some functionality a direct call from one client to another would be best. For performance reasons some clients will be written in C++. For simplicity the server and other clients will be written in Python. Is this possible? In my attempt below the client call never returns. In the example client calls manager which calls provider.
poc.capnp
@0xdb7253e3ef44cbf9;
interface Manager {
  registerService @0 (service :Service) -> ();
  getService @1 () -> (service :Service);
  get @2 () -> (val :UInt16);
  interface Service {
    get @0 () -> (val :UInt16);
  }
}
client.py
#!/usr/bin/python3
import sys
import capnp
import poc_capnp
def start_client(host):
  client = capnp.TwoPartyClient(host)
  ms = client.bootstrap().cast_as(poc_capnp.Manager)
  print(ms.get().wait())
  service = ms.getService().wait()
  print('got service')
  print(service.get().wait())
start_client(sys.argv[1])
#!/usr/bin/python3
import sys
import capnp
import poc_capnp
import asyncio
class Provider(poc_capnp.Manager.Service.Server):
  def __init__(self):
    self.counter = 0
  def get(self, **kwargs):
    print(self.counter)
    self.counter = (self.counter + 1) % 10000
    return self.counter
async def socket_reader(client, reader):
  while True:
    try:
      data = await asyncio.wait_for(reader.read(4096), timeout=1.0)
      client.write(data)
    except asyncio.TimeoutError:
      pass
async def socket_writer(client, writer):
  while True:
    try:
      data = await asyncio.wait_for(client.read(4096), timeout=1.0)
      await writer.write(data.tobytes())
    except asyncio.TimeoutError:
      pass
async def start_ipc(host):
  client = capnp.TwoPartyClient()
  ms = client.bootstrap().cast_as(poc_capnp.Manager)
  reader, writer = await asyncio.wait_for(asyncio.open_connection(*host.split(':')), timeout=1.0, )
  coroutines = [socket_reader(client, reader), socket_writer(client, writer)]
  asyncio.gather(*coroutines, return_exceptions=True)
  await ms.registerService(Provider()).a_wait()
  while True:
    await asyncio.sleep(1)
asyncio.run(start_ipc(sys.argv[1]))
manager.py
#!/usr/bin/python3
import sys
import capnp
import poc_capnp
import asyncio
g_service = None
class ManagerImpl(poc_capnp.Manager.Server):
  def __init__(self):
    pass
  def registerService(self, service, **kwargs):
    global g_service
    g_service = service
    print('service registered')
  def getService(self, **kwargs):
    global g_service
    print('service retrieved')
    return g_service
  def get(self, _context, **kwargs):
    global g_service
    print('service called')
    return g_service.get().then(
      lambda value: setattr(_context.results, "val", value)
    )
# -------------------------------------------------------
class Server:
  async def myreader(self):
    while self.retry:
      try:
        # Must be a wait_for so we don't block on read()
        data = await asyncio.wait_for(
          self.reader.read(4096),
          timeout=0.1
        )
      except asyncio.TimeoutError:
        # print("myreader timeout.")
        continue
      except Exception as err:
        print("Unknown myreader err: %s", err)
        return False
      await self.server.write(data)
    print("myreader done.")
    return True
  async def mywriter(self):
    while self.retry:
      try:
        # Must be a wait_for so we don't block on read()
        data = await asyncio.wait_for(
          self.server.read(4096),
          timeout=0.1
        )
        self.writer.write(data.tobytes())
      except asyncio.TimeoutError:
        # print("mywriter timeout.")
        continue
      except Exception as err:
        print("Unknown mywriter err: %s", err)
        return False
    print("mywriter done.")
    return True
  async def myserver(self, reader, writer):
    # Start TwoPartyServer using TwoWayPipe (only requires bootstrap)
    self.server = capnp.TwoPartyServer(bootstrap=ManagerImpl())
    self.reader = reader
    self.writer = writer
    self.retry = True
    # Assemble reader and writer tasks, run in the background
    coroutines = [self.myreader(), self.mywriter()]
    tasks = asyncio.gather(*coroutines, return_exceptions=True)
    while True:
      self.server.poll_once()
      # Check to see if reader has been sent an eof (disconnect)
      if self.reader.at_eof():
        self.retry = False
        print('close connection')
        break
      await asyncio.sleep(0.01)
    # Make wait for reader/writer to finish (prevent possible resource leaks)
    await tasks
async def new_connection(reader, writer):
  print('new connection')
  server = Server()
  await server.myserver(reader, writer)
async def run_server(host):
  server = await asyncio.start_server(new_connection, *host.split(':'))
  print('server started')
  async with server:
    await server.serve_forever()
asyncio.run(run_server(sys.argv[1]))