Allow async client connections in pub/sub server

117 views
Skip to first unread message

Dan WIllans

unread,
Apr 8, 2022, 3:30:22 PM4/8/22
to Cap'n Proto
Hi all,

I've been having fun playing around with Cap'n Proto and attempting to create simple subscriber/publisher classes using Cap'n Proto RPC and KJ.

I've created simple server/client pub/sub applications that use TwoPartyServer and TwoPartyClient classes. I've attempted to wrap the server app I made into it's own class using the constructor to setup the connection/listening logic. My current method has been to copy the implementation of the TwoPartyServer code and modify where needed.

The issue I'm facing is keeping the `listen()` promise alive to accept new incoming connections whilst I continue to do other work in the rest of the application.

Code for my normal working application:
```
kj::UnixEventPort::captureSignal(SIGINT);
auto io_context = kj::setupAsyncIo();
auto addrPromise = io_context.provider->getNetwork().parseAddress("unix:/tmp/capnp-server-example");
auto addr = addrPromise.wait(io_context.waitScope);
auto addrListen = addr->listen();
capnp::TwoPartyServer server(kj::heap<PublisherImpl<capnp::Text>>(sub_map));
auto server_listen = server.listen(*addrListen);
auto& timer = io_context.provider->getTimer();
auto pub = publishLoop(timer, 1);
io_context.unixEventPort.onSignal(SIGINT).wait(io_context.waitScope);
std::cout << "Shutting down Publisher\n";
```

Code for my wrapped class application constructor (the rest of the class is effectively the TwoPartyServer code):
```
template <typename T>
Publisher<T>::Publisher(const std::string& connection_address, kj::AsyncIoContext& io_context)
: tasks_(*this), connection_address_(connection_address), io_context_(io_context) {
auto listener = io_context_.provider->getNetwork().parseAddress(connection_address);
auto addr = listener.wait(io_context.waitScope);
auto addrListen = addr->listen();
addTask(listen(*addrListen));
}
```

This builds fine but I get an exception when I run the server.
```
error: exception = kj/async.c++:2714: failed: PromiseFulfiller was destroyed without fulfilling the promise.
```

I'm assuming the normal main application works because the `listen()` promise remains in scope but when I attempt to add the `kj::Promise<void>` to the member variable kj::TaskSet something isn't quite right.

My ideal situation would be this.
```
int main(int argc, const char** argv) {
    // setup event loop etc.
    auto io_context = kj::setupAsyncIO();
    // Instantiate a publisher that accepts incoming connections asynchronously.
    auto pub = Publisher<capnp::Text>("unix:/tmp/capnp-server-example", io_context);
    // Start publishing at a specific frequency. Implementation involves a kj::Timer with afterDelay().
    pub.publishAtFrequency("Hello Subscribers", 1);
    // Spin on this publisher. It should continue to accept new subscribers whilst publishing.
    pub.spin()
}
```

I hope I've given enough context and information. Please let me know if you need more information and thank you for this awesome library!

Dan


Reply all
Reply to author
Forward
0 new messages