capnproto RPC and boost asio

967 views
Skip to first unread message

shu...@gmail.com

unread,
Mar 7, 2016, 11:27:52 AM3/7/16
to Cap'n Proto
All,
I'm trying to integrate capnproto (including the RPC) into an existing program which uses asio as its event loop.
Looking for pointers, I found this stackoverflow thread, where Kenton stated - 
"Note that if both your client and server are C++, you may want to consider using Cap'n Proto's own RPC protocol, which is easier to set up and already avoids all unnecessary copies. However, integrating Cap'n Proto's event loop with boost::asio is currently non-trivial. It's possible -- for example you can look at node-capnp which integrates Cap'n Proto with libuv's event loop -- but may be more work than you want to do."

  1. Have anyone did it in the meantime and shared / can share the outcome / main steps?
  2. Note that I don't have any preference regarding which event loop is the 'external' one - I'm OK with wrapping the KJ event loop in boost or vice versa, whatever simpler.

I'll be happy for your comments,
Thanks,
Avi

Kenton Varda

unread,
Mar 7, 2016, 11:43:44 PM3/7/16
to shu...@gmail.com, Cap'n Proto
Hi Avi,

FWIW, in addition to the libuv compat layer (in node-capnp, which you linked), there are also these two pull requests implementing compatibility with some other event loops:

https://github.com/sandstorm-io/capnproto/pull/253 -- Qt

Unfortunately neither is asio and neither is complete, but maybe these help show how this is done.

-Kenton

--
You received this message because you are subscribed to the Google Groups "Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email to capnproto+...@googlegroups.com.
Visit this group at https://groups.google.com/group/capnproto.

Avi Shua

unread,
Mar 8, 2016, 6:34:49 AM3/8/16
to Kenton Varda, Cap'n Proto
Hi Kenton,
Thanks, I've reviewed it and planning to start implementing it.
But, reviewing the Event Port interface I failed to understand something which I believe to be basic - 
- Event loops usually boil down to a OS level 'select', 'epoll', WaitForMultipleObjects, or other function that blocks till ONE event happens (timer, read from socket, ...). 
- Therefore, TTBOMK, combining an event loop means that one of the event loops needs to expose its inner most waitable objects, while the 2nd waits on them as well and notify the first when they are triggered. I could think of other designs, but - I don't see how two threads can share an event loop without waiting on the combined lists of waitable objects.
- I see that the Event Port interface exposes a 'wait' function which is supposed to block, and I assume that this is where the '2nd' event loop Wait function should happen - but, assuming this is the way it is done - how could the KJ event loop be triggered to handle the async response (E.g. response which is an RPC response).

Again, thanks for your time and effort.

Avi

Kenton Varda

unread,
Mar 8, 2016, 3:58:10 PM3/8/16
to Avi Shua, Cap'n Proto
Hi Avi,

The idea here is that the other library -- asio in your case -- would be the "primary" event loop, and the purpose of implementing EventPort is to adapt KJ's event loop to delegate to it.

EventPort::wait() should call whatever asio's function is to "wait for next event then dispatch all events". Note that you only need to implement this method if you care about being able to use Promise::wait(), which usually only makes sense in the main() function of client apps. You can make this throw an exception if your code is purely async.

EventPort::poll() should call asio's function to "check for pending I/O and dispatch all pending events, but don't wait for more". However, poll() only exists to avoid possible starvation in the case that the CPU is saturated with events. Most correctly-functioning programs never saturate the CPU and therefore poll() can safely be a no-op.

Thus the only thing you really need to implement is EventPort::setRunnable(), which the KJ event loop calls to indicate when it has events queued. When this happens, you need to queue a callback at the asio level that will invoke kj::EventLoop::run() in the near future.

The trickier part in all of this is implementing AsyncIoStream in terms of asio: You need to provide an implementation of AsyncIoStream wrapping (probably) a file descriptor which performs reads and writes through asio.

Once you have AsyncIoStream implemented, then you can set up Cap'n Proto RPC over it via capnp/rpc-twoparty.h.

-Kenton

Avi Shua

unread,
Apr 4, 2016, 10:50:02 AM4/4/16
to Cap'n Proto, shu...@gmail.com
Hi,
Some time passed, and I now had a chance to get back to it.
I started to implement the asio based EventPort and AsyncIOStream; But - for some odd reason - I can't manage to make the Promises to be fulfilled (e.g. their lambdas are never called). Any idea? Note that I see that the 'asio' part is working, and calling the kj eventloop run. 

Relevant code snippets (attached only the relevant parts for clarity)


void asio_event_port::setRunnable(bool runnable)
{
this->runnable = runnable;

io_service.post([this]()
{
if (this->runnable && event_loop){
                         // NOTE - I see that this printf is printed, and event_loop->run (kj::event_loop) is indeed called. Note that in the thread scope I've created a WaitScope which receives the eventloop in the c'tor, before starting all of the below
printf("Calling event loop run\n");
event_loop->run();
}
});
}


kj::Promise<void> asio_kj_io_stream::connect(const std::string& host, const std::string& port)
{
ip::tcp::resolver resolver(io_service);
auto server_it = resolver.resolve({ host, port });
auto paf = newPromiseAndFulfiller<void>();
connectFulfiller = std::move(paf.fulfiller);

asio::async_connect(asio_socket, server_it, [this](std::error_code , ip::tcp::resolver::iterator)
{
printf("Async connect lambda called\n");
KJ_IF_MAYBE(f, this->connectFulfiller){
printf("About to fulfill!\n");  // NOTE - I see that this printf is printed
f->get()->fulfill();
}
this->connectFulfiller = nullptr;
});

return kj::mv(paf.promise);
}

// Connect promise is of kj::ForkedPromise<void> type
connectPromise = kj_stream.connect("linor.checkpoint.com", "80").fork();
kj::Promise<void>* myPromise = new kj::Promise<void> (connectPromise.addBranch().then([]()
{
printf("Connect finished!\n"); // Flow will never reach this point

}));

Avi Shua

unread,
Apr 5, 2016, 8:16:01 AM4/5/16
to Cap'n Proto, shu...@gmail.com
Found the problem (or at least the solution). Eagerly evaluating the promise solved the issue. 
Reply all
Reply to author
Forward
0 new messages