Integrating KJ with other systems

218 views
Skip to first unread message

Vitali Lovich

unread,
Jul 3, 2020, 11:47:04 PM7/3/20
to Cap'n Proto
I'm trying to integrate cap'n'proto with surrounding threads that don't use KJ event loops. On Android it's easier because I have an event loop to adapt to with ALooper (albeit it's crashing - details in the SO post).

For my Windows codebase I don't really have anything in the way of an event loop. Would it be a good idea to create a simple event loop implementation that just has a condition variable + a vector of callbacks to invoke & adapt it in a similar way that I did with the ALooper? Will I need more?

Kenton Varda

unread,
Jul 5, 2020, 1:18:35 PM7/5/20
to Vitali Lovich, Cap'n Proto
I posted an answer on Stack Overflow.

--
You received this message because you are subscribed to the Google Groups "Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email to capnproto+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/capnproto/e4d62be1-d478-47f4-83bf-6b715237c414n%40googlegroups.com.

Vitali Lovich

unread,
Jul 5, 2020, 2:48:18 PM7/5/20
to Kenton Varda, Cap'n Proto
Yeah, so I'm still working through getting executeSync working. I do wish executeAsync would still be able to do the right thing when invoked from off-thread to avoid creating artificial synchronization points, but I can see it being challenging to accomplish something like that without adding synchronization into the happy path too. Basically the challenges I'm having is trouble terminating the event loop of a server. I *think* the answer I've stumbled upon is to wrap any futures I schedule with a cancellation object that I then cancel when I go to shut down the thread. I'm now seeing a crash on the non shutdown path though that I'm trying to figure out.

I couldn't figure out how to get the message passing going. None of the I/O facilities in KJ look like they are amenable to having one end not connected to the event loop that it was created on. If you have any tips on how to make this work I'd definitely be interested in learning more. It also wasn't clear from the documentation (which I may have overlooked) if I/O objects created on 1 thread's loop can have one end living on another loop and moreover how to adapt them back into a fashion that would be friendly to not being on an event loop at all. For example, some of the documentation around capability transfers says it doesn't work on Windows but then other pieces seem to imply it might (maybe it's only if I'm trying to .

Re #3, that I too couldn't figure out. AsyncIoStream is a bit amenable to that because you can shutdown off-thread since it's a syscall that doesn't return a promise, but that doesn't work for the other ones that don't have a way of closing the FD or changing the FD to be blocking so that it can be used off-thread.

I did actually do #4 but where I got stuck is that I still ended up stuck at step 1 where I wasn't sure how to bridge a native KJ thread & a non-KJ thread. Additionally, various facilities within KJ don't play well with (I *think*) with having a separate event loop (eg. kj::getCurrentThreadExecutor() IIRC) & I couldn't get it quite to work well (+ various facilities like setupIoAsync aren't available & a bunch of I/O code seemingly needs to be reimplemented from scratch to get parity on certain things). There's also a lot of KJ re-implemented in the nodeJS piece in addition to the nodeJS integration so it's a bit hard to at a glance figure out which is the loop adaption pieces needed & which are just for bridging with v8 (at least, not without spending a lot of time to fully groking that file).

If there's any improvements to KJ/capnp you have a semi clear idea on how to architect to improve this area of the code, I'm happy to discuss it with you offline & work on a patch set for KJ/capnp. I've noticed it's a perineal pain point & solving that (& documenting it) would probably sand off some of the rough edges for new people. Might be a key improvement to land for 1.0.

Kenton Varda

unread,
Jul 5, 2020, 6:02:24 PM7/5/20
to Vitali Lovich, Cap'n Proto
On Sun, Jul 5, 2020 at 1:48 PM Vitali Lovich <vlo...@gmail.com> wrote:
Yeah, so I'm still working through getting executeSync working. I do wish executeAsync would still be able to do the right thing when invoked from off-thread to avoid creating artificial synchronization points, but I can see it being challenging to accomplish something like that without adding synchronization into the happy path too. Basically the challenges I'm having is trouble terminating the event loop of a server. I *think* the answer I've stumbled upon is to wrap any futures I schedule with a cancellation object that I then cancel when I go to shut down the thread. I'm now seeing a crash on the non shutdown path though that I'm trying to figure out.

Make sure you're using Executor::addRef() to take a strong reference from the calling thread. Then if the receiving thread quits, the calling thread should get a DISCONNECTED exception, but shouldn't crash.

FWIW I think we *could* easily have something like execAsync() that doesn't require the caller to be a KJ thread -- but the lambda (and all its owned captures) would be destroyed in the receiving thread, and the calling thread wouldn't get any notification of completion (it would have to implement its own notification if needed). But that's probably fine for most use cases.
 
I couldn't figure out how to get the message passing going. None of the I/O facilities in KJ look like they are amenable to having one end not connected to the event loop that it was created on. If you have any tips on how to make this work I'd definitely be interested in learning more. It also wasn't clear from the documentation (which I may have overlooked) if I/O objects created on 1 thread's loop can have one end living on another loop and moreover how to adapt them back into a fashion that would be friendly to not being on an event loop at all. For example, some of the documentation around capability transfers says it doesn't work on Windows but then other pieces seem to imply it might (maybe it's only if I'm trying to .

Well, it's true you can't use newTwoWayPipe() because it returns a pair of KJ objects which can't transfer between threads.

But if you create your own OS pipe and then pass the file descriptors (or HANDLEs) to LowLevelAsyncIoProvider to wrap them, then you can certainly wrap one end in the KJ thread and do whatever you want with the other end.

Alternatively you could have the KJ thread listen for network connections on a loopback port, and then connect to it from another thread.

Windows unfortunately doesn't have socketpair(), which makes these things a lot easier.
 
Re #3, that I too couldn't figure out. AsyncIoStream is a bit amenable to that because you can shutdown off-thread since it's a syscall that doesn't return a promise, but that doesn't work for the other ones that don't have a way of closing the FD or changing the FD to be blocking so that it can be used off-thread.

I did actually do #4 but where I got stuck is that I still ended up stuck at step 1 where I wasn't sure how to bridge a native KJ thread & a non-KJ thread. Additionally, various facilities within KJ don't play well with (I *think*) with having a separate event loop (eg. kj::getCurrentThreadExecutor() IIRC) & I couldn't get it quite to work well (+ various facilities like setupIoAsync aren't available & a bunch of I/O code seemingly needs to be reimplemented from scratch to get parity on certain things). There's also a lot of KJ re-implemented in the nodeJS piece in addition to the nodeJS integration so it's a bit hard to at a glance figure out which is the loop adaption pieces needed & which are just for bridging with v8 (at least, not without spending a lot of time to fully groking that file).

Unfortunately, you do in fact have to re-implement AsyncIoStream if you use an alternate event loop, because you have to build it on that alternate event loop's own facilities for async socket I/O.

Vitali Lovich

unread,
Jul 6, 2020, 9:02:16 AM7/6/20
to Kenton Varda, Cap'n Proto
On Sun, Jul 5, 2020 at 3:02 PM Kenton Varda <ken...@cloudflare.com> wrote:
On Sun, Jul 5, 2020 at 1:48 PM Vitali Lovich <vlo...@gmail.com> wrote:
Yeah, so I'm still working through getting executeSync working. I do wish executeAsync would still be able to do the right thing when invoked from off-thread to avoid creating artificial synchronization points, but I can see it being challenging to accomplish something like that without adding synchronization into the happy path too. Basically the challenges I'm having is trouble terminating the event loop of a server. I *think* the answer I've stumbled upon is to wrap any futures I schedule with a cancellation object that I then cancel when I go to shut down the thread. I'm now seeing a crash on the non shutdown path though that I'm trying to figure out.

Make sure you're using Executor::addRef() to take a strong reference from the calling thread. Then if the receiving thread quits, the calling thread should get a DISCONNECTED exception, but shouldn't crash.
Am I missing something? Executor doesn't inherit from refcount...
 
FWIW I think we *could* easily have something like execAsync() that doesn't require the caller to be a KJ thread -- but the lambda (and all its owned captures) would be destroyed in the receiving thread, and the calling thread wouldn't get any notification of completion (it would have to implement its own notification if needed). But that's probably fine for most use cases.
Yeah, that kind of aligns with how things like libdispatch handle things which I view as the gold standard for this kind of stuff.
  
I couldn't figure out how to get the message passing going. None of the I/O facilities in KJ look like they are amenable to having one end not connected to the event loop that it was created on. If you have any tips on how to make this work I'd definitely be interested in learning more. It also wasn't clear from the documentation (which I may have overlooked) if I/O objects created on 1 thread's loop can have one end living on another loop and moreover how to adapt them back into a fashion that would be friendly to not being on an event loop at all. For example, some of the documentation around capability transfers says it doesn't work on Windows but then other pieces seem to imply it might (maybe it's only if I'm trying to .

Well, it's true you can't use newTwoWayPipe() because it returns a pair of KJ objects which can't transfer between threads.

But if you create your own OS pipe and then pass the file descriptors (or HANDLEs) to LowLevelAsyncIoProvider to wrap them, then you can certainly wrap one end in the KJ thread and do whatever you want with the other end.

Alternatively you could have the KJ thread listen for network connections on a loopback port, and then connect to it from another thread.

Windows unfortunately doesn't have socketpair(), which makes these things a lot easier.
Yeah, that's what I figured. The KJ API is soooo much more user friendly though :). It would be cool to be able to consume the raw file descriptor/socket out of the pipes the LowlevelIoProvider constructs to simplify code/error handling.

Kenton Varda

unread,
Jul 6, 2020, 10:14:56 AM7/6/20
to Vitali Lovich, Cap'n Proto
On Mon, Jul 6, 2020 at 8:02 AM Vitali Lovich <vlo...@gmail.com> wrote:
On Sun, Jul 5, 2020 at 3:02 PM Kenton Varda <ken...@cloudflare.com> wrote:
On Sun, Jul 5, 2020 at 1:48 PM Vitali Lovich <vlo...@gmail.com> wrote:
Yeah, so I'm still working through getting executeSync working. I do wish executeAsync would still be able to do the right thing when invoked from off-thread to avoid creating artificial synchronization points, but I can see it being challenging to accomplish something like that without adding synchronization into the happy path too. Basically the challenges I'm having is trouble terminating the event loop of a server. I *think* the answer I've stumbled upon is to wrap any futures I schedule with a cancellation object that I then cancel when I go to shut down the thread. I'm now seeing a crash on the non shutdown path though that I'm trying to figure out.

Make sure you're using Executor::addRef() to take a strong reference from the calling thread. Then if the receiving thread quits, the calling thread should get a DISCONNECTED exception, but shouldn't crash.
Am I missing something? Executor doesn't inherit from refcount...


This was added fairly recently, though. Maybe it wasn't in 0.8.0.
 
 Yeah, that's what I figured. The KJ API is soooo much more user friendly though :). It would be cool to be able to consume the raw file descriptor/socket out of the pipes the LowlevelIoProvider constructs to simplify code/error handling.

Yeah, I think adding `kj::Maybe<int> getFd();` (and `kj::Maybe<void*> getHandle();` on Windows) to the `AsyncIoStream` interface is probably something we should do. I resisted this for a long time since it breaks the abstraction a bit, but there's just too many places where it would be useful, especially for interoperability and optimizations.

-Kenton

Vitali Lovich

unread,
Jul 6, 2020, 10:28:51 AM7/6/20
to Kenton Varda, Cap'n Proto
On Mon, Jul 6, 2020 at 7:14 AM Kenton Varda <ken...@cloudflare.com> wrote:
On Mon, Jul 6, 2020 at 8:02 AM Vitali Lovich <vlo...@gmail.com> wrote:
On Sun, Jul 5, 2020 at 3:02 PM Kenton Varda <ken...@cloudflare.com> wrote:
On Sun, Jul 5, 2020 at 1:48 PM Vitali Lovich <vlo...@gmail.com> wrote:
Yeah, so I'm still working through getting executeSync working. I do wish executeAsync would still be able to do the right thing when invoked from off-thread to avoid creating artificial synchronization points, but I can see it being challenging to accomplish something like that without adding synchronization into the happy path too. Basically the challenges I'm having is trouble terminating the event loop of a server. I *think* the answer I've stumbled upon is to wrap any futures I schedule with a cancellation object that I then cancel when I go to shut down the thread. I'm now seeing a crash on the non shutdown path though that I'm trying to figure out.

Make sure you're using Executor::addRef() to take a strong reference from the calling thread. Then if the receiving thread quits, the calling thread should get a DISCONNECTED exception, but shouldn't crash.
Am I missing something? Executor doesn't inherit from refcount...


This was added fairly recently, though. Maybe it wasn't in 0.8.0.
Yeah that's why. I'll just update to the latest snapshot then.

 Yeah, that's what I figured. The KJ API is soooo much more user friendly though :). It would be cool to be able to consume the raw file descriptor/socket out of the pipes the LowlevelIoProvider constructs to simplify code/error handling.

Yeah, I think adding `kj::Maybe<int> getFd();` (and `kj::Maybe<void*> getHandle();` on Windows) to the `AsyncIoStream` interface is probably something we should do. I resisted this for a long time since it breaks the abstraction a bit, but there's just too many places where it would be useful, especially for interoperability and optimizations.
 
And similarly transfer ownership out by having AsyncIoProvider have an unwrap methd that take in an Own<AsyncIoStream> and return an OwnFd?

Kenton Varda

unread,
Jul 6, 2020, 11:13:09 AM7/6/20
to Vitali Lovich, Cap'n Proto
On Mon, Jul 6, 2020 at 9:28 AM Vitali Lovich <vlo...@gmail.com> wrote:
 Yeah, that's what I figured. The KJ API is soooo much more user friendly though :). It would be cool to be able to consume the raw file descriptor/socket out of the pipes the LowlevelIoProvider constructs to simplify code/error handling.

Yeah, I think adding `kj::Maybe<int> getFd();` (and `kj::Maybe<void*> getHandle();` on Windows) to the `AsyncIoStream` interface is probably something we should do. I resisted this for a long time since it breaks the abstraction a bit, but there's just too many places where it would be useful, especially for interoperability and optimizations.
 
And similarly transfer ownership out by having AsyncIoProvider have an unwrap methd that take in an Own<AsyncIoStream> and return an OwnFd?

I think that could again be a method on the stream itself; doesn't need to be on AsyncIoProvider.

OTOH, it might not be necessary to support ownership transfer. Instead, you could call getFd() and then dup() it (or DuplicateHandle() on windows). Then you can destroy the AsyncIoStream, which will close its copy of the handle, but you still have the duplicate. But if implementing a `Maybe<AutoCloseFd> releaseFd();` method seems easy then I'm fine with that.

-Kenton 

Vitali Lovich

unread,
Jul 6, 2020, 11:22:09 AM7/6/20
to Kenton Varda, Cap'n Proto
I don't feel great about the duplication approach (or allowing retrieving the raw handle directly) as it can be subtly tricky to actually use correctly. For example, if you dup & make the dup'ed FD blocking, that will impact correct behavior within cap'n'proto. So as a user you'd end up wanting to always close the other FD after retrieving it. Since all usages have you invalidating the stream & consuming the FD, putting it in the provider is nice: it's symmetrical, you clearly have ownership of the stream, and it's a natural spot to have flags to automatically make the socket/FD blocking again.


-Kenton 

Kenton Varda

unread,
Jul 6, 2020, 11:34:09 AM7/6/20
to Vitali Lovich, Cap'n Proto
On Mon, Jul 6, 2020 at 10:22 AM Vitali Lovich <vlo...@gmail.com> wrote:
I don't feel great about the duplication approach (or allowing retrieving the raw handle directly) as it can be subtly tricky to actually use correctly. For example, if you dup & make the dup'ed FD blocking, that will impact correct behavior within cap'n'proto. So as a user you'd end up wanting to always close the other FD after retrieving it. Since all usages have you invalidating the stream & consuming the FD, putting it in the provider is nice: it's symmetrical, you clearly have ownership of the stream, and it's a natural spot to have flags to automatically make the socket/FD blocking again.

I think there are a lot of use cases where you wouldn't want to destroy the AsyncIoStream object, though. For example, if I have a web server that's serving a file from disk, it might call getFd() so that it can use sendfile() to push bytes out without copying through userspace. But the stream can be reused for another HTTP request after that, so it can't be destroyed in the meantime.

Hmm, in fact, for this use case it seems like the web server has to be able to get the FD *and* the UnixEventPort::FdObserver object together, so that it can use the latter to observe when the socket is writable. That's a bit of a problem since FdObserver is currently specific to UnixEventPort; when using something like the libuv compatibility layer in node-capnp, this wouldn't work. Maybe the answer is to define an FdObserver abstract interface. At least the way sockets work is pretty consistent across unix variants. (On Windows, of course, something entirely different is needed... but that was already true.)

I don't like requiring an AsyncIoProvider to unwrap because it's generally uncommon to pass the AsyncIoProvider down into libraries. I think these kinds of optimizations should be possible to implement without requiring the library to take whole new, wide interfaces.

-Kenton

Vitali Lovich

unread,
Jul 6, 2020, 11:54:24 AM7/6/20
to Kenton Varda, Cap'n Proto
That's fair but I think the problem of weakly obtaining the FD still has a lot of pitfalls. Even if you return all the things you need, you potentially have scheduled I/O in-flight (e.g. you may be pumping the FD). So you need a way to plug it or your sendfile will crap out unless you're doing it in-situ of the KJ loop (i.e. you can't defer the sendfile to a background thread & continue to process any remaining I/O). Co-ordinating all of that could be pretty tricky. I think a potential middle ground for this might be to have the stream give you the fd and a fulfiller. Any I/O operations on the stream then are blocked from executing until after the promise to return the FD is fulfilled. I don't think the WebServer would need an FdObserver because you could return the FD in blocking mode & the user could dispatch the I/O to a background thread & get back onto the executor to fulfill the promise once complete. Not like sendfile and non-blocking interact all that well.

I don't know how to tackle the library problem for the interface but I also don't have any experience to understand how such libraries are written. The way I've structured all of my libraries is that either it's owning the cap'n'proto loop as an implementation detail running on its own thread OR it's dealing with implementing the capability & thus the low-level I/O generally doesn't come up (although maybe it does if you're shuttling FDs around - I haven't gotten to that point yet). I think for the simple use-cases consuming the Stream is the correct option as it will end up crashing if you have any in-flight promises (& lifting that restriction would have to be a requirement for doing so in the above case).

So it feels like a straightforward QOL addition to AsyncIoProvider with minimal complexity & the richer, more powerful API with the streams themselves could be done separately focusing on all the nuances of how it should behave.


-Kenton

Kenton Varda

unread,
Jul 13, 2020, 8:28:44 PM7/13/20
to Vitali Lovich, Cap'n Proto
On Mon, Jul 6, 2020 at 10:54 AM Vitali Lovich <vlo...@gmail.com> wrote:
I think a potential middle ground for this might be to have the stream give you the fd and a fulfiller. Any I/O operations on the stream then are blocked from executing until after the promise to return the FD is fulfilled.

That's an interesting idea. Or rather, I'd have it return a "BorrowedFd" object which gives you access to the FD, and whose destructor unblocks the stream. 

That said, it is already the case today that concurrent operations on a stream in the same direction (e.g. calling write() again when a previous write() hasn't finished) is undefined behavior. By the same logic as your argument, we should instead have the second write() block until the first one finishes. And I would indeed agree that, in theory, that is better behavior. But, I don't like the amount of complexity that creates for the stream implementation. If AsyncStreamFd were the only implementation of AsyncIoStream, then I'd say, fine, we can handle it. But lots of other places, including apps, implement AsyncIoStream in various ways, and it would be a big burden for all of them to handle concurrent operations gracefully. Meanwhile, this behavior would be of very little benefit to the vast majority of callers; in practice, streams naturally tend to be accessed sequentially.

So, I prefer to keep the burden on the caller to make sure they don't issue concurrent operations. And it seems to me that this policy extends naturally to borrowing the FD.
 
I don't think the WebServer would need an FdObserver because you could return the FD in blocking mode & the user could dispatch the I/O to a background thread & get back onto the executor to fulfill the promise once complete. Not like sendfile and non-blocking interact all that well.

I don't agree here. I don't think most use cases would call for spawning background threads to do blocking I/O. splice(), for example, interacts with non-blocking IO quite nicely. (And whether or not you consider sendfile() to play nice with non-blocking IO kind of gets into a philosophical debate of whether the disk should properly be treated as a remote device vs. "L5 memory". Not everyone agrees on this topic.)

Also, switching the FD to blocking mode would imply that you *cannot* use this method while having any async reads or writes in-flight, even if you don't plan to issue any conflicting reads or writes. E.g. what if you just want the FD to perform an ioctl() on it? Or you want to do manual writes but use the regular KJ machinery for reads?

-Kenton
Reply all
Reply to author
Forward
0 new messages