CPU-bound RPC command & optional API

81 views
Skip to first unread message

pepij...@gmail.com

unread,
Mar 18, 2021, 7:18:50 AM3/18/21
to Cap'n Proto
Hey,

I'm designing an RPC server for simulators that can run several different long-running commands, and I have a few design questions.

My current design is as follows.
Upon loading files to simulate, the simulator returns a list of unions of interfaces, representing all the simulation commands it supports.
Upon calling one of the commands, a reader interface is returned that allows streaming simulation results.
This currently happens in-thread, so reading big chunks will block the entire server.

So first question is, is there a nicer way to represent a thing that can implement a subset of functions? I could just define a mega interface and not implement methods, but then the issue is how to discover which methods the server implements. Or maybe the correct approach is to use multiple inheritance from the smaller interfaces to concrete implementations?

The next question is how to offload the simulation to a thread? I assumed this would be a very common task but I can't find much in the docs. I found KJ::Thread or something like that, but it's not clear to me how to tie that into the event loop promise API.

Final issue I'm thinking about, for *very* long running simulations, the client disconnecting in the middle of simulation that takes days or weeks becomes a real concern. This is basically level 2 of https://capnproto.org/rpc.html#protocol-features but as far as I understand C++ is only level 1. What would be a good way to go about things here? If level 2 is just around the corner, I can just ignore the issue for a while, but maybe I need to manually store simulator references outside the connection and hand out tokens to it?

Regards,
Pepijn

pepij...@gmail.com

unread,
Mar 19, 2021, 11:10:43 AM3/19/21
to Cap'n Proto
I think I figured out the first two problems.
Multiple inheritance worked out quite nicely, only downside is each simulator needs to be defined explicitly.
I think I found the correct way to do threading, which seems to be to use kj::getCurrentThreadExecutor() to get a way to schedule callbacks on the eventloop thread.
So I guess you'd pass a promise to the thread and then fulfill it using the executor.
As with most of these things, they are quite puzzling at first but actually quite nice once you figure them out.

So the only problem left is how to handle disconnects in extremely long running processes.

Cheers,
Pepijn

Kenton Varda

unread,
Mar 19, 2021, 2:49:43 PM3/19/21
to pepij...@gmail.com, Cap'n Proto
Yes, Executor is a good way to communicate between threads. FWIW you might also check out newCrossThreadPromiseAndFulfiller(), which was added very recently. Sometimes it's a better fit than kj::Executor.

"Level 2" turns out to be something that can't really be built into libcapnp itself because the design really depends on the execution environment in which your servers run. A system for saving capabilities and restoring them later needs to understand how to connect to -- and possibly even start up -- the appropriate server and storage system. So, for example, Sandstorm.io has implemented level 2 of the protocol in a way that is appropriate for it, but there ends up being not much that libcapnp itself ca ndo to help. You can take a look at persistent.capnp to see a suggestion for how to structure a level 2 implementation, but it's not much more than a suggestion.

Ultimately, it's really up to you to come up with the right way for a client to request a token representing a particular running simulation, and then be able to connect back to that simulation later, probably by presenting the token to another RPC service.

-Kenton

--
You received this message because you are subscribed to the Google Groups "Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email to capnproto+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/capnproto/8fe36a3a-88dd-4761-9fa9-48ba90a1da0fn%40googlegroups.com.

pepijn de vos

unread,
Mar 19, 2021, 4:40:58 PM3/19/21
to Kenton Varda, Cap'n Proto
Thanks a lot, I'll look into the level 2 stuff.

newCrossThreadPromiseAndFulfiller looks perfect for what I'm trying to do.
I actually wanted more of a queue abstraction, which doesn't seem to be there AFAICT, so I'm trying to make one.

The basic idea is that when the consumer tries to pop from an empty queue it creates a PromiseAndFulfiller. When the producer pushes a new value it uses an executor to do so on the event loop thread of the consumer, and then checks if the consumer is waiting, if so fulfills the promise, else just pushes onto the queue. It looks like once I get a version with the new cross API, I can use that. For now I have something like this. Happy to make a PR once it works and if you think it's useful.

Pepijn

template<class T>
class AsyncQueue
{
    public:
    AsyncQueue() : paf(kj::newPromiseAndFulfiller<T>()), exec(kj::getCurrentThreadExecutor()) {}

    kj::Promise<T> pop() {
        if (queue.empty()) {
            return kj::mv(paf.promise);
        } else {
            T val = queue.front();
            queue.pop();
            return val;
        }
    }

    void push(T val) {
        exec.executeSync([val, this]() {
            if(this->paf.fulfiller->isWaiting()) {
                this->paf.fulfiller->fulfill(T(val));
                paf = kj::newPromiseAndFulfiller<T>();
            } else {
                this->queue.push(val);
            }
        });
    }
    std::queue<T> queue;
    kj::PromiseFulfillerPair<T> paf;
    const kj::Executor &exec;
};

pepijn de vos

unread,
Mar 21, 2021, 2:13:45 PM3/21/21
to Kenton Varda, Cap'n Proto
I rewrote my queue for newCrossThreadPromiseAndFulfiller, after installing Cap'n proto from git, but now Python no longer works :(
I also installed pycapnp from git, but am getting the following error:
undefined symbol: _ZN2kj1_24TransformPromiseNodeBase16getInnerForTraceEv

Cheers,
Pepijn

template<class T>
class AsyncQueue : public kj::AtomicRefcounted
{
    public:
    AsyncQueue() : paf(kj::newCrossThreadPromiseAndFulfiller<T>()) {}

    kj::Promise<T> pop() {
        kj::Locked<std::queue<T>> queue = mutex.lockExclusive();
        if (queue->empty()) {
            return kj::mv(paf.promise);
        } else {
            T val = queue->front();
            queue->pop();

            return val;
        }
    }

    void push(T val) {
        kj::Locked<std::queue<T>> queue = mutex.lockExclusive();

        if(this->paf.fulfiller->isWaiting()) {
            this->paf.fulfiller->fulfill(T(val));
            paf = kj::newCrossThreadPromiseAndFulfiller<T>();
        } else {
            queue->push(val);
        }
    }
    kj::MutexGuarded<std::queue<T>> mutex;
    kj::PromiseFulfillerPair<T> paf;
};

Kenton Varda

unread,
Mar 22, 2021, 5:20:51 PM3/22/21
to pepijn de vos, Cap'n Proto
Yeah, you probably need to recompile the python module against the new version of Cap'n Proto. Unfortunately, it's very hard to remain ABI-compatible across versions in C++, so KJ and Cap'n Proto don't try.

-Kenton
Reply all
Reply to author
Forward
0 new messages