Groups keyboard shortcuts have been updated
Dismiss
See shortcuts

Storing interfaces for later use

341 views
Skip to first unread message

kknb...@gmail.com

unread,
Aug 25, 2016, 1:22:35 PM8/25/16
to Cap'n Proto
Hi,
I'm trying to develop an application that replicates remote objects locally and keeps them updated.  This effectively boils down to a publication-subscription model where the local proxy object sends an interface (capability in capnp speak?) to the server that can be used to send it notifications.  The server stores this interface pointer and responds with an "okay", then later uses the interface to send the local object notifications if the server-side object changes.
It doesn't work however.  If I use the interface pointer after the original "subscription" call has resolved, the message is never received on the client.  If I use the interface _during_ the subscription request the message is received by the client, but the "then(...)" block on the server is not executed.  I've looked through the calculator example, but that uses the client-side interface immediately.

- Can the interface pointers (capabilities right?) given in one call be stored for later use?  I assume this must be the case but I can't be doing it correctly.
- I'm using the ez-rpc interfaces.  Is this one of the use cases not supported by them?
- As this is my most basic test case, both the client and server are in the same compilation unit.  I don't do any thread manipulation myself so I assume they're on the same event loop.  Is this an issue?  If so, I want the single process use case for testing, so is starting one or the other on a different thread sufficient?

I can reduce the code to a simple example, but before going to the effort I wanted to make sure I'm not misunderstanding something.

Thanks,

Mark.

Cap'n Proto version 0.5.3

Kenton Varda

unread,
Aug 25, 2016, 1:39:27 PM8/25/16
to kknb...@gmail.com, Cap'n Proto
Hi Mark,

Terminology note: Think of "capability" as meaning the same thing as "pointer", except it's a pointer that cannot be forged and without which you cannot otherwise access the target object. An "interface" describes the methods implemented by an object to which a capability points.

What you describe should work, but there are a number of ways you might be getting it wrong. It's hard to say which without seeing your code, but here are some things to check:

- Are you tearing down the RPC client too early? Once the network connection closes, all capabilities delivered through it become disconnected. Make sure you construct the RPC client (e.g. EzRpcClient) once, not every time you want to make a call.

- Are you discarding promises before they complete? When you have a kj::Promise<T> representing an asynchronous task which hasn't completed yet, and you allow that promise to go out-of-scope (without calling .then() or anything else on it), the asynchronous task will be canceled. To prevent this, create a kj::TaskSet and add promises to the set -- the TaskSet ensures that the task runs to completion (unless you destroy the TaskSet, of course). For example:

    void doThings(kj::TaskSet& tasks) {
      kj::Promise<void> task1 = doSomething().then(...).then(...);
      kj::Promise<void> task2 = doSomething().then(...).then(...);

      tasks.add(kj::mv(task1));
    }

In the code above, only task1's .then()s will ever execute -- task2 will be canceled when the function returns.

- Another reason why a .then() continuation might not execute is if an exception was thrown. You can catch exceptions by providing a second callback to .then() which takes kj::Exception as the parameter. Note that when creating a kj::TaskSet, you will be forced to provide an error callback which will be used whenever a task throws an exception -- I usually do KJ_LOG(ERROR, exception) inside that callback.

-Kenton

--
You received this message because you are subscribed to the Google Groups "Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email to capnproto+unsubscribe@googlegroups.com.
Visit this group at https://groups.google.com/group/capnproto.

Mark Grimes

unread,
Aug 25, 2016, 8:25:25 PM8/25/16
to Kenton Varda, Cap'n Proto
Hi,
Thanks for taking the time to respond.  I was doing all of those things, simple communication works fine but it breaks when trying to pass a client capability to the server for it to use in the "publish" phase.
I eventually figured out if I add in a

kj::NEVER_DONE.wait( client.getWaitScope() );

at the end everything works, but locks at the end of the test fixture (as you would expect).  I think I fundamentally misunderstood where the event loop processing is done. I thought this was performed on a separate thread, so a simple "this_thread::sleap_for(...)" would allow all the handlers in the queue to finish (not the most optimal I know but I wanted to make sure everything worked first).  I now know (think?) there must be a "wait()" _somewhere_ to enter the event loop again on the current thread and clear the queue.

Since I'm using this in test fixtures, NEVER_DONE is not an option because I want to return control and carry on with other tests.

- Is there a way with the ez-rpc interfaces to process everything currently in the queue and then return control?  I see EventLoop has a "run()" method but I don't see a way to get the EventLoop from the EzRpcServer/Client.
- If not, is there something like "kj::DO_UNTIL.wait( seconds(1), client.getWaitScope() )"; or a std::condition_variable style notification to stop?
- With both client and server on the same event loop, there's no difference between "client.getWaitScope()" and "server.getWaitScope()" right?

It does seem to be a great library by the way, the bits I've managed to get working.

Thanks,

Mark.

Mark Grimes

unread,
Aug 26, 2016, 1:50:14 PM8/26/16
to Kenton Varda, Cap'n Proto
Hi,
As a temporary workaround I'm just making a request I don't need on the main capability, after all the calls that I actually want.  I then wait on that promise to clear the queue.  The problem is that the replication (i.e. publication) happens in the background so I can't wait on those promises, and the event loop needs to be entered for the handlers to run.
That's good enough for now.  I'm not sure how to handle the event loop in the client once I get to production code, but I'll cross that bridge when I come to it.

Thanks for the pointers.


Kenton Varda

unread,
Aug 26, 2016, 6:22:37 PM8/26/16
to Mark Grimes, Cap'n Proto
Hi Mark,

Cap'n Proto is single-threaded. The event loop happens on the current thread.

The event loop will run whenever you call .wait() on a promise. What you need to do is create a promise that completes when your test is "done", then wait on that.

One way to do this might be to use a promise/fulfiller pair. See kj::newPromiseAndFulfiller<>(). This returns a pair of a promise and a fulfiller, where the fulfiller can be called later in order to provide the result which will then be returned by the promise. So, in your main function, you would call .wait() on the promise, and then in some other code, when you determine that it's time to exit, you would call fulfiller->fulfill().

If you don't have any particular way to detect "doneness" and instead want to wait for some amount of time, you can use the timer:

    ezRpcClient.getIoProvider.getTimer().afterDelay(1 * kj::SECONDS).wait();

However, using time delays like this in tests leads to slow, flaky tests, so I don't recommend it.

-Kenton

Mark Grimes

unread,
Sep 8, 2016, 5:27:51 PM9/8/16
to Cap'n Proto
Arghh, forgot reply all for the group.  Message below...

---------- Forwarded message ----------
From: Mark Grimes <kknb...@gmail.com>
Date: 8 September 2016 at 22:24
Subject: Re: [capnproto] Storing interfaces for later use
To: Kenton Varda <ken...@sandstorm.io>


Hi,
Coming back to this now that I'm ready for higher level tests.

I'm a bit confused by newPromiseAndFulfiller.  Trying myself and also reading a few posts in this group, it's not possible to fulfil a promise from a different thread.  How is it possible to fulfil a promise if the thread that has to fulfil it is blocked in a wait()?  I guess it's in the event loop and not technically blocked, but as far as my code is concerned it's as good as blocked.  So I'm a bit confused by what it's use is outside the EventLoop internals or "then()" blocks.

What I currently have is an override of EventPort which is thread synched and takes requests to create (or fulfil) promises on the EventLoop thread.  This seems to work fine for basic promises.  Coming to use RPC however, it looks like I'm going to have to implement everything the standard EventPort does, as well as carbon copies of every class from LowLevelAsyncIoProviderImpl up where the only difference is that it uses my EventPort instead of UnixEventPort.  This seems so unnecessarily complex I must be doing something wrong.  I'm happy to do this but wanted to make sure I hadn't missed something simpler.

To describe my original problem, in case I've gone so far down the wrong road the original issue is lost:
I need to create an API which gives results from RPC calls as std::futures (so far easy), but the server and client will be periodically replicating data in the background without input from my API's user.  This implies I need the Captain Proto client (Ez or otherwise) running on another thread, but also able to take RPC requests from the main thread.  What's the best way of doing this?

Thanks,

Mark.

Kenton Varda

unread,
Sep 8, 2016, 6:45:03 PM9/8/16
to Mark Grimes, Cap'n Proto
Hi Mark,

Usually, the best way to use Cap'n Proto is in a program that is 100% single-threaded and event-driven. It's possible to integrate the KJ event loop with other event loop frameworks such that they can cooperate within the same thread -- see, for example, node-capnp, which integrates with libuv (the event loop used by node.js).

However, if you have no knowledge of the event loop used by the application that calls your library, then you will indeed probably need to use a separate thread. Probably what you will need to do here is create a pipe, where the main thread holds the write end of the pipe, and the capnp thread holds the read end. In the capnp thread, use LowLevelAsyncIoProvider to wrap the pipe FD so that you can listen on it asynchronously. Whenever an RPC is initiated from the main thread, add it to a mutex-protected queue, then write a byte to the pipe to wake up the capnp thread. The capnp thread should then consume requests from the queue.

I do plan to add a nicer API for cross-thread events eventually, but it's one of the long list of things that we haven't yet needed for Sandstorm and so it hasn't been prioritized.

-Kenton

Mark Grimes

unread,
Sep 9, 2016, 4:23:26 PM9/9/16
to Kenton Varda, Cap'n Proto
Hi,
Thanks a lot for pointers, that saved me a lot of work.  In the end I managed it with the (high level) AsyncIoProvider so I should get portability for free if wider system support is added later.  In case anybody googles this later a rough outline of what I did is below.


(slightly from memory)

kj::Own<AsyncInputStream> threadContactStream; // to send instructions to capnpThread
std::deque<std::function<void()> queue;  // Queue of commands (RPC requests or whatever) to run on capnpThread
std::mutex queueMutex;
MyRPC::Client rpcInterface;  // the remote capability, not actually used in this example

std::thread capnpThread=std::thread( [&]() {
capnp::EzRpcClient client(URI);
rpcInterface=client.getMain<MyRPC>();

// Get a pipe so that the main thread can tell this thread when to wake up
auto myPipe=client.getIoProvider().newOneWayPipe();
threadContactStream=std::move(myPipe.out);

// Need a std::condition_variable::notify_all (or equivalent) here to tell the
// main thread everything has been constructed and it's safe to continue.

char buffer=1; // Anything other than 0 to get started
while( buffer!=0 )
{
// Wait for a single byte to be sent on the pipe
myPipe.in->read( &buffer, 1 ).wait( client.getWaitScope() );

// Move all current requests out of the shared deque, so that
// a lock is not kept while the functions are running.
std::deque<std::function<void()>> copiedRequests;
{ // Block to limit the lock
std::lock_guard<std::mutex> lock(queueMutex);
copiedRequests.swap(queue);
}
for( auto& function : copiedRequests )
{
if( function ) function();
}
}

rpcInterface=nullptr;  // This is pretty important, seems to have issues otherwise
});

// Need a std::condition_variable::wait (or equivalent) here to make sure the other
// thread has time to start and initialise shared objects.

// Push some commands to the other thread
{ // block to limit lock's scope
std::lock_guard<std::mutex> lock(queueMutex);
queue.push_back( [](){ std::cout << "This runs on the client's EventLoop thread!" << std::endl; } );
}
// tell the capnp thread to wake up and check the queue
const char wakeSignal=1;  // Anything other than 0
threadContactStream->write( &wakeSignal, 1 );

<do other stuff>

// Tell the capnp thread to quit, then wait for it.
const char quitSignal=0;
threadContactStream->write( &quitSignal, 1 );
capnpThread.join();


At the risk of stating the blindingly obvious, if you're testing on a single machine the server event loop doesn't execute with the client "wait()", which probably means starting the server on yet another thread and giving it a NEVER_DONE.  Not that I got stuck on that point for half a day with a non responsive server or anything...

Reply all
Reply to author
Forward
0 new messages