Hi,
Thanks a lot for pointers, that saved me a lot of work. In the end I managed it with the (high level) AsyncIoProvider so I should get portability for free if wider system support is added later. In case anybody googles this later a rough outline of what I did is below.
(slightly from memory)
kj::Own<AsyncInputStream> threadContactStream; // to send instructions to capnpThread
std::deque<std::function<void()> queue; // Queue of commands (RPC requests or whatever) to run on capnpThread
std::mutex queueMutex;
MyRPC::Client rpcInterface; // the remote capability, not actually used in this example
std::thread capnpThread=std::thread( [&]() {
capnp::EzRpcClient client(URI);
rpcInterface=client.getMain<MyRPC>();
// Get a pipe so that the main thread can tell this thread when to wake up
auto myPipe=client.getIoProvider().newOneWayPipe();
threadContactStream=std::move(myPipe.out);
// Need a std::condition_variable::notify_all (or equivalent) here to tell the
// main thread everything has been constructed and it's safe to continue.
char buffer=1; // Anything other than 0 to get started
while( buffer!=0 )
{
// Wait for a single byte to be sent on the pipe
myPipe.in->read( &buffer, 1 ).wait( client.getWaitScope() );
// Move all current requests out of the shared deque, so that
// a lock is not kept while the functions are running.
std::deque<std::function<void()>> copiedRequests;
{ // Block to limit the lock
std::lock_guard<std::mutex> lock(queueMutex);
copiedRequests.swap(queue);
}
for( auto& function : copiedRequests )
{
if( function ) function();
}
}
rpcInterface=nullptr; // This is pretty important, seems to have issues otherwise
});
// Need a std::condition_variable::wait (or equivalent) here to make sure the other
// thread has time to start and initialise shared objects.
// Push some commands to the other thread
{ // block to limit lock's scope
std::lock_guard<std::mutex> lock(queueMutex);
queue.push_back( [](){ std::cout << "This runs on the client's EventLoop thread!" << std::endl; } );
}
// tell the capnp thread to wake up and check the queue
const char wakeSignal=1; // Anything other than 0
threadContactStream->write( &wakeSignal, 1 );
<do other stuff>
// Tell the capnp thread to quit, then wait for it.
const char quitSignal=0;
threadContactStream->write( &quitSignal, 1 );
capnpThread.join();
At the risk of stating the blindingly obvious, if you're testing on a single machine the server event loop doesn't execute with the client "wait()", which probably means starting the server on yet another thread and giving it a NEVER_DONE. Not that I got stuck on that point for half a day with a non responsive server or anything...