client to client communication

135 views
Skip to first unread message

Leon Wessels

unread,
Oct 14, 2021, 3:39:12 AM10/14/21
to capn...@googlegroups.com
Hi,

I have a use case where multiple clients connect to a server. A call from a client to the server might require the server to call another client. For some functionality a direct call from one client to another would be best. For performance reasons some clients will be written in C++. For simplicity the server and other clients will be written in Python. Is this possible? In my attempt below the client call never returns. In the example client calls manager which calls provider.

poc.capnp
@0xdb7253e3ef44cbf9;

interface Manager {
    registerService @0 (service :Service) -> ();
    getService @1 () -> (service :Service);
    get @2 () -> (val :UInt16);

    interface Service {
        get @0 () -> (val :UInt16);
    }
}

client.py
#!/usr/bin/python3

import sys
import capnp
import poc_capnp


def start_client(host):
    client = capnp.TwoPartyClient(host)
    ms = client.bootstrap().cast_as(poc_capnp.Manager)
    print(ms.get().wait())
    service = ms.getService().wait()
    print('got service')
    print(service.get().wait())


start_client(sys.argv[1])

provider.py
#!/usr/bin/python3

import sys
import capnp
import poc_capnp
import asyncio


class Provider(poc_capnp.Manager.Service.Server):
    def __init__(self):
        self.counter = 0

    def get(self, **kwargs):
        print(self.counter)
        self.counter = (self.counter + 1) % 10000
        return self.counter


async def socket_reader(client, reader):
    while True:
        try:
            data = await asyncio.wait_for(reader.read(4096), timeout=1.0)
            client.write(data)
        except asyncio.TimeoutError:
            pass


async def socket_writer(client, writer):
    while True:
        try:
            data = await asyncio.wait_for(client.read(4096), timeout=1.0)
            await writer.write(data.tobytes())
        except asyncio.TimeoutError:
            pass


async def start_ipc(host):
    client = capnp.TwoPartyClient()
    ms = client.bootstrap().cast_as(poc_capnp.Manager)

    reader, writer = await asyncio.wait_for(asyncio.open_connection(*host.split(':')), timeout=1.0, )
    coroutines = [socket_reader(client, reader), socket_writer(client, writer)]
    asyncio.gather(*coroutines, return_exceptions=True)

    await ms.registerService(Provider()).a_wait()
    while True:
        await asyncio.sleep(1)


asyncio.run(start_ipc(sys.argv[1]))

manager.py
#!/usr/bin/python3

import sys
import capnp
import poc_capnp
import asyncio


g_service = None


class ManagerImpl(poc_capnp.Manager.Server):
    def __init__(self):
        pass

    def registerService(self, service, **kwargs):
        global g_service
        g_service = service
        print('service registered')

    def getService(self, **kwargs):
        global g_service
        print('service retrieved')
        return g_service

    def get(self, _context, **kwargs):
        global g_service
        print('service called')
        return g_service.get().then(
            lambda value: setattr(_context.results, "val", value)
        )


# -------------------------------------------------------


class Server:
    async def myreader(self):
        while self.retry:
            try:
                # Must be a wait_for so we don't block on read()
                data = await asyncio.wait_for(
                    self.reader.read(4096),
                    timeout=0.1
                )
            except asyncio.TimeoutError:
                # print("myreader timeout.")
                continue
            except Exception as err:
                print("Unknown myreader err: %s", err)
                return False
            await self.server.write(data)
        print("myreader done.")
        return True

    async def mywriter(self):
        while self.retry:
            try:
                # Must be a wait_for so we don't block on read()
                data = await asyncio.wait_for(
                    self.server.read(4096),
                    timeout=0.1
                )
                self.writer.write(data.tobytes())
            except asyncio.TimeoutError:
                # print("mywriter timeout.")
                continue
            except Exception as err:
                print("Unknown mywriter err: %s", err)
                return False
        print("mywriter done.")
        return True

    async def myserver(self, reader, writer):
        # Start TwoPartyServer using TwoWayPipe (only requires bootstrap)
        self.server = capnp.TwoPartyServer(bootstrap=ManagerImpl())
        self.reader = reader
        self.writer = writer
        self.retry = True

        # Assemble reader and writer tasks, run in the background
        coroutines = [self.myreader(), self.mywriter()]
        tasks = asyncio.gather(*coroutines, return_exceptions=True)

        while True:
            self.server.poll_once()
            # Check to see if reader has been sent an eof (disconnect)
            if self.reader.at_eof():
                self.retry = False
                print('close connection')
                break
            await asyncio.sleep(0.01)

        # Make wait for reader/writer to finish (prevent possible resource leaks)
        await tasks


async def new_connection(reader, writer):
    print('new connection')
    server = Server()
    await server.myserver(reader, writer)


async def run_server(host):
    server = await asyncio.start_server(new_connection, *host.split(':'))
    print('server started')
    async with server:
        await server.serve_forever()


asyncio.run(run_server(sys.argv[1]))

client.py
manager.py
provider.py
poc.capnp

Kenton Varda

unread,
Oct 15, 2021, 10:23:46 AM10/15/21
to Leon Wessels, Jacob Alexander, Cap'n Proto
Hi Leon,

Abstractly, what you describe should be easy with Cap'n Proto. One client can send a capability (interface reference) to the server, and the server can freely send that capability on to some other client. Cap'n Proto will automatically arrange to proxy messages from one client to the other, through the server. (Someday, three-party handoff will allow the clients to form a direct connection to each other... when we get around to implementing it.)

I am not very familiar with the Python implementation so I'm not sure I can help debug what specifically is wrong here. Maybe Jacob (cc'd) can help.

-Kenton

--
You received this message because you are subscribed to the Google Groups "Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email to capnproto+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/capnproto/CALao4Qunk9UfFyPjqncxxvp4_gStev95%2Bjzw8KGL5TLpG0CeFg%40mail.gmail.com.

Jacob Alexander

unread,
Oct 16, 2021, 12:31:21 AM10/16/21
to Leon Wessels, Cap'n Proto, Kenton Varda
I'll try to poke at the code this weekend. But what you're trying to do "should" work.

One thing I've noticed is that when things hang with Python asyncio it usually means something is wrong with whatever is polling the tx on the sending side or the rx on the receiving side. If anything died prematurely things will tend to just hang.
I'd recommend trying to debug the asyncio as that's likely where the problem is (rather than a specific pycapnp issue).

-Jacob

Leon Wessels

unread,
Oct 18, 2021, 1:44:30 AM10/18/21
to Cap'n Proto
Thanks for the info. I'll look into it and post an update.

Leon Wessels

unread,
Oct 19, 2021, 10:03:12 AM10/19/21
to Cap'n Proto
I've added a patch that solves my issue for future reference.
patch.diff
Reply all
Reply to author
Forward
0 new messages