RPC protocol implementation reliability guarantees

557 views
Skip to first unread message

bazu...@gmail.com

unread,
Jun 27, 2018, 5:46:28 AM6/27/18
to Cap'n Proto
Hello!

Due to to how TCP works the application generally have now way of knowing if it received all the messages when peer dies without additional application-level protocol. Does Cap'n Proto RPC two party implementation guarantees the delivery of messages (method calls) when working through TCP sockets? Simple experiments show that there is a "disconnected" exception if I kill the server in between the method calls. I want to be extra sure that this is guaranteed to happen and not due to some side effect of other communications over the socket which allowed the implementation to detect the broken connection.

I've tried to implement my own simple reliable data sending using only the serialisation layer and writeMessage but this proved to be quite tricky because the first writeMessage after the disconnect always succeeds, sometimes the second too, and those message are lost forever, so retrying in the exception handler is not an option. There should be some queuing and confirmation mechanisms built-in. I wanted to do this without affecting the message format and it seemed to be very hard. I can not figure out yet how Cap'n Proto RPC implementation handles this situation.

Also I would be very grateful for any advises related to implementing reliable message transfer using the Cap'n Proto. Is there is something specific I should watch for?

Thanks a lot!

Kenton Varda

unread,
Jun 28, 2018, 4:41:14 PM6/28/18
to bazu...@gmail.com, Cap'n Proto
Hello,

It's not sufficient to simply confirm delivery of packets to the app. You also need to confirm that the app actually processed them. An app might receive the packets, but then crash, or suffer from a power outage, meaning that even though the packets were "delivered", they were still effectively lost.

The way this is handled in Cap'n Proto is by returning a response to the RPC. A client should always wait for the response and either handle or propagate errors.

Note that by default, TCP does not time out. So, if you send a request and then suffer a network partition (e.g. a cable was unplugged, or the remote machine had a power failure, or any other condition that causes the connection to fail without even sending a reset), you may end up waiting forever. There are two ways you can deal with this:

1. Set up your TCP socket yourself and set the appropriate socket options so that it times out after an appropriate length of time, then pass the socket to the Cap'n Proto RPC system. When the socket times out, you will receive a "disconnected" exception.
2. Apply a timeout at the application level, using kj::Timer::timeoutAfter(). This lets you set a timeout on a specific RPC call, or any other promise.

-Kenton

--
You received this message because you are subscribed to the Google Groups "Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email to capnproto+unsubscribe@googlegroups.com.
Visit this group at https://groups.google.com/group/capnproto.

bazu...@gmail.com

unread,
Jun 29, 2018, 7:16:56 AM6/29/18
to Cap'n Proto
Digging further I've found that there is a fundamental difference between single device and network cases. There is a nice "TCP Puzzlers" article which illustrates the issue clearly. Luckily, I do not have to deal with network partitions because I'm using Cap'n Proto for IPC on a single device.

There is another point I've found by experimenting and which answers my initial concerns. As the writes to the closed TCP socket can succeed depending on the TCP state it is possible to loose both "call" and "finish" RPC messages if the server crashed during the processing of the call. Setting linger timeout to 0 in the server side allows RESET response to be sent immediately after the crash (most likely during the "finish" messages) but still not solves the problem due to other corner-cases. It seems that in the case of TCP sockets the logic of sending messages, waiting for responses and restoring the connection should be handled separately.

Note that all of this is irrelevant for the case of unix domain sockets because as soon as the peer died the write throws "disconnected" so I can just restore the connection in the exception handler and try again. The overall control flow is much simpler.

It was my wrong assumption that Cap'n Proto somehow abstracts away the underlying transport layer.

Thanks for taking time to answer my questions!

To unsubscribe from this group and stop receiving emails from it, send an email to capnproto+...@googlegroups.com.

Kenton Varda

unread,
Jun 29, 2018, 5:55:09 PM6/29/18
to bazu...@gmail.com, Cap'n Proto
Even in the case of a unix socket, you can successfully write to the socket buffer without the other side ever reading it. E.g. perhaps the other side crashes immediately after the write completes.

It's a fundamental property of distributed systems that all communications are unreliable and it is ultimately impossible to abstract this away from applications. Not that that has stopped people from trying. :)

-Kenton

To unsubscribe from this group and stop receiving emails from it, send an email to capnproto+unsubscribe@googlegroups.com.

bazu...@gmail.com

unread,
Jul 4, 2018, 4:19:43 PM7/4/18
to Cap'n Proto
After some experimenting I've managed to produce the following "reliable" client:

#include <ipctest.capnp.h>

#include <capnp/rpc-twoparty.h>

#include <chrono>
#include <thread>

using namespace std;
using namespace capnp;
using namespace kj;

struct IpcClient
{
    AsyncIoContext ioContext;
    Own<NetworkAddress> networkAddress;
    Own<AsyncIoStream> connection;
    Own<TwoPartyClient> rpcClient;
    IpcTest::Client client;
    bool connected;

    IpcClient(StringPtr address) :
        ioContext{setupAsyncIo()},
        client{nullptr},
        connected{false}
    {
        networkAddress = ioContext.provider->getNetwork()
            .parseAddress(address).wait(ioContext.waitScope);
    }

    Promise<void> connect()
    {
        return networkAddress->connect().then(
            [this] (Own<AsyncIoStream>&& stream) -> Promise<void> {
                connected = true;
                connection = mv(stream);
                rpcClient = heap<TwoPartyClient>(*connection);
                client = rpcClient->bootstrap().castAs<IpcTest>();
                return READY_NOW;
            });
    }

    Promise<bool> reconnect(int tries = 1)
    {
        if (connected) return true;
        if (tries-- > 0) {
            return connect().then(
                [=] () -> Promise<bool> {
                    return true;
                },
                [=] (Exception&& ex) -> Promise<bool> {
                    KJ_DBG("connect failed", tries, ex);
                    if (tries >= 0) {
                        return ioContext.provider->getTimer()
                            .afterDelay(1*SECONDS)
                            .then([=] { return reconnect(tries); });
                    }
                    return false;
                });
        }
        return false;
    }

    template<typename Sender>
    Promise<void> call(Sender makeRequest, int retries = 1)
    {
        return reconnect(1).then([=] (bool reconnected) -> Promise<void> {
            if (!reconnected) return READY_NOW;
            return ioContext.provider->getTimer()
                .timeoutAfter(2*SECONDS, makeRequest())
                .catch_([=] (Exception&& ex) -> Promise<void> {
                    KJ_DBG("call failed", retries, ex);
                    if (ex.getType() == Exception::Type::DISCONNECTED) {
                        connected = false;
                        return ioContext.provider->getTimer().afterDelay(3*SECONDS)
                            .then([=] {
                                return reconnect(3).then(
                                    [=] (bool reconnected) mutable -> Promise<void> {
                                        if (reconnected && retries-- > 0)
                                            return call(makeRequest, retries);
                                        else
                                            return READY_NOW;
                                    });
                            });
                    }
                    return READY_NOW;
                });
        });
    }

    Promise<void> echo(int value)
    {
        return call([=] {
            auto request = client.echoRequest();
            request.setValue(value);
            return request.send().then(
                [] (Response<IpcTest::EchoResults>&& response) -> Promise<void> {
                    auto value = response.getValue();
                    KJ_DBG("echo response", value);
                    return READY_NOW;
                });
        });
    }
};

int main()
{
    StringPtr address{"127.0.0.1:3456"};
    IpcClient ipc{address};
    auto& ioContext = ipc.ioContext;

    // ipc.connect().wait(ioContext.waitScope);

    for (int i = 0; i < 5; i++) {
        KJ_DBG("sending", i);
        ipc.echo(i)
           .then([&] { return ioContext.provider->getTimer()
               .afterDelay(5*SECONDS); })
           .wait(ioContext.waitScope);
    }

    return 0;
}

The call wrapper does the following:
  1. On each call it tries to connect if there was no connection previously and abandons the call if failed.
  2. Sends the call with 2 second timeout. Should cancel the call if the timeout is expired.
  3. If the call failed with "disconnected" exception:
    1. Wait 3 seconds.
    2. Try to reconnect 3 times, once per second.
    3. If the reconnection succeeded - repeat the request with the same parameters.
    4. If the repeated request failed again - abandon it (to handle continuously crashing server). 
The idea was to provide an example of real world usage which is fault tolerant but the result ended up quite messy looking. I am not very familiar with promise style programming. Am I doing this right? Any ideas for improvements welcome!

John

unread,
Jul 4, 2018, 5:44:27 PM7/4/18
to capn...@googlegroups.com
You never find out a communication has failed, except you do not get an
ack in a timely manner.

And you always have to go ahead doing stuff on the assumption that an
ack is coming.

If an ack is unreasonably delayed, you send a nack or a resend.

If a few resends at ever increasing intervals do not work, you have a
failure condition that effects work that has been done quite some time
before you discovered the failure condition.

This is incompatible with the way you would like to write a program, so
you ask for a security guarantee, that will not in fact be forthcoming.

You handle this by wrapping unreliable interactions in transactions, or
something that works like transactions, not by reliability guarantees.

You also handle this by looking at the task as collecting evidence of
the state of other machines.  "What do I locally know about what the far
machine knew at time x?"

You find you need probabilistic evidence "Chances are that the other
machine will have seen that data before I send it this data" and strong
evidence  "The other machine has acked this data, thus my transaction is
complete."  You are building a data structure that tells you about the
known state of distant machines.

bazu...@gmail.com

unread,
Jul 5, 2018, 4:24:20 PM7/5/18
to Cap'n Proto
Thank you John for the valuable insight! It solidifies my understanding so far.

In my example above I assume that if there is no answer for within 2 seconds the server is probably too busy but if the connection itself is not broken the request will probably succeed. In my test case I see "finish" message from the client after the timeout which probably tries to cancel the request but this does not have any effect because the server is still hanging in the handler function. After the server completes the request it sends the "return" message but it is ignored by the client AFAIKS. And it seems there is no way to detect this late return.

The way to handle this situation is to try again with the increasing timeouts, as per your suggestion, until the answer is received in time. This actually somewhat increases the load on the server. Also to avoid double serving this requires the messages to be idempotent AFAIK. This is doable if I have my own message format with ID or something but I do not see how this could be solved using capnp RPC protocol without changing the interface definitions.

This leads me to thinking that I'm approaching the problem from the wrong side. Actually I've found the following comment in rpc.h:

// The RPC implementation sits on top of an implementation of `VatNetwork`.  The `VatNetwork`
// determines how to form connections between vats -- specifically, two-way, private, reliable,
// sequenced datagram connections.  The RPC implementation determines how to use such connections
// to manage object references and make method calls.

Does the "reliable" here means that the connection state tracking, retries, etc. are supposed to be handled on the layer below RPC by something like ZeroMQ (implemented by custom VatNetwork subclass) or it is about lower level requirements such as packets within message are not reordered and such?

Actually for many use cases it will probably much simpler to just write some serialized data to ZeroMQ REQREP socket but full RPC looks nice and convenient it is just unclear for me how to use it in "production" setup. Is everyone who uses CapnProto RPC has some higher level layer above it with state tracking, transaction semantics and such and maybe also custom VanNetwork implementations or I am missing something obvious here? Sorry for silly questions, I just really like this library and trying figure out how to get the most out of it. :)

Kenton Varda

unread,
Jul 9, 2018, 3:59:57 PM7/9/18
to bazu...@gmail.com, Cap'n Proto
On Thu, Jul 5, 2018 at 1:24 PM, <bazu...@gmail.com> wrote:
// The RPC implementation sits on top of an implementation of `VatNetwork`.  The `VatNetwork`
// determines how to form connections between vats -- specifically, two-way, private, reliable,
// sequenced datagram connections.  The RPC implementation determines how to use such connections
// to manage object references and make method calls.

Does the "reliable" here means that the connection state tracking, retries, etc. are supposed to be handled on the layer below RPC by something like ZeroMQ (implemented by custom VatNetwork subclass) or it is about lower level requirements such as packets within message are not reordered and such?

"Reliable" here means in the sense of TCP: each message will be automatically re-transmitted until either it is delivered or the connection fails.
 
Actually for many use cases it will probably much simpler to just write some serialized data to ZeroMQ REQREP socket but full RPC looks nice and convenient it is just unclear for me how to use it in "production" setup. Is everyone who uses CapnProto RPC has some higher level layer above it with state tracking, transaction semantics and such and maybe also custom VanNetwork implementations or I am missing something obvious here? Sorry for silly questions, I just really like this library and trying figure out how to get the most out of it. :)

I think you're expecting something here that isn't really possible. TCP is already a reliable transport -- as reliable as it can be. It already re-transmits packets after a timeout. Adding timeouts and retries on top of TCP will not make your transport more reliable.

It only makes sense to try to handle errors if there is some application-specific strategy you can use that TCP itself couldn't have. Cap'n Proto can't do this for you, because it depends on your app. Are there multiple servers you could be connecting to? If so, then it might make sense to try a different server if the first one times out. But Cap'n Proto doesn't know about your app's replications strategy, so it can't do this automatically. Retrying to the same server after a timeout probably isn't a good idea -- it will only "dig the hole deeper", overloading that server.

I find it's usually easiest to handle errors at a high level rather than a low level. Usually, it's much more obvious what to do about a problem at a higher level. In fact, I find in the vast majority of situations, the right way to handle an error is simply to abort and log the error. You can then monitor your logs and decide if error rates are too high in some area, and then focus on implementing better error handling strategies there.

-Kenton

bazu...@gmail.com

unread,
Jul 11, 2018, 7:10:27 AM7/11/18
to Cap'n Proto


On Monday, 9 July 2018 21:59:57 UTC+2, Kenton Varda wrote:
On Thu, Jul 5, 2018 at 1:24 PM, <bazu...@gmail.com> wrote:
Actually for many use cases it will probably much simpler to just write some serialized data to ZeroMQ REQREP socket but full RPC looks nice and convenient it is just unclear for me how to use it in "production" setup. Is everyone who uses CapnProto RPC has some higher level layer above it with state tracking, transaction semantics and such and maybe also custom VanNetwork implementations or I am missing something obvious here? Sorry for silly questions, I just really like this library and trying figure out how to get the most out of it. :)

I think you're expecting something here that isn't really possible. TCP is already a reliable transport -- as reliable as it can be. It already re-transmits packets after a timeout. Adding timeouts and retries on top of TCP will not make your transport more reliable.

OK. Maybe "reliable" was a wrong word. I want to guarantee that all calls eventually reach the server even if it crashes during or in between. Unavailability is not a concern because it will be immediately restarted. I need to just keep trying. High load is a separate issue which can be ignored for now.
 
It only makes sense to try to handle errors if there is some application-specific strategy you can use that TCP itself couldn't have. Cap'n Proto can't do this for you, because it depends on your app. Are there multiple servers you could be connecting to? If so, then it might make sense to try a different server if the first one times out. But Cap'n Proto doesn't know about your app's replications strategy, so it can't do this automatically. Retrying to the same server after a timeout probably isn't a good idea -- it will only "dig the hole deeper", overloading that server.

This makes sense in a general case but I'm using CapnProto for an IPC on a single device, do not need to load balance, etc.
 
I find it's usually easiest to handle errors at a high level rather than a low level. Usually, it's much more obvious what to do about a problem at a higher level. In fact, I find in the vast majority of situations, the right way to handle an error is simply to abort and log the error. You can then monitor your logs and decide if error rates are too high in some area, and then focus on implementing better error handling strategies there.

Sorry if I've approached this from too far away. It turned out to be much harder problem than I've anticipated. So, assuming that I'm already implementing error handling strategy in an attempt to not lose messages I have a few more practical questions.

In the event of disconnect all capabilities are lost, all in-flight calls are dropped, etc. All RPC state is effectively lost. Makes sense. Am I right that the only handling strategy here is to connect again with NetworkAddress, recreate the TwoPartyClient from the returned AsyncIoStream and bootstrap the root capability again from the client? There are mentions of persistent capabilities and "sturdy refs" in docs and sources. But I've not figured out what is this exactly, how to use them and if it will be any help at all in this case.

To handle disconnects I need to attach "catch_" to every request send call which tries to reconnect (and bootstrap, etc.) on the error and repeat the call. I've tried to create a generic call wrapper function but there is a problem with it: it is impossible to store requests nor send the same request twice, so the only option is to capture the whole request generating code into a lambda and repeat that. I need to copy all the data into the lambda and rebuild the request every time before sending. Looks kind of awkward and inefficient. You can see an attempt in the code I've posted above. Maybe I've missed some lower level API which could help with that? Does the repeating the same request from the application level makes sense or I'm trying to do something silly here?

bazu...@gmail.com

unread,
Jul 11, 2018, 1:13:06 PM7/11/18
to Cap'n Proto


On Wednesday, 11 July 2018 13:10:27 UTC+2, bazu...@gmail.com wrote:
In the event of disconnect all capabilities are lost, all in-flight calls are dropped, etc. All RPC state is effectively lost. Makes sense. Am I right that the only handling strategy here is to connect again with NetworkAddress, recreate the TwoPartyClient from the returned AsyncIoStream and bootstrap the root capability again from the client? There are mentions of persistent capabilities and "sturdy refs" in docs and sources. But I've not figured out what is this exactly, how to use them and if it will be any help at all in this case.

To handle disconnects I need to attach "catch_" to every request send call which tries to reconnect (and bootstrap, etc.) on the error and repeat the call. I've tried to create a generic call wrapper function but there is a problem with it: it is impossible to store requests nor send the same request twice, so the only option is to capture the whole request generating code into a lambda and repeat that. I need to copy all the data into the lambda and rebuild the request every time before sending. Looks kind of awkward and inefficient. You can see an attempt in the code I've posted above. Maybe I've missed some lower level API which could help with that? Does the repeating the same request from the application level makes sense or I'm trying to do something silly here?


Actually after thinking some more it makes sense that requests need to be created again because they are constructed from the client capability handle and inherently tied to it. After the disconnection the capability becomes invalid so the repeated requests needs to be created with the new capability instance after the bootstrap.

Is there some way to restore broken connection without recreating all objects a new and loosing all the state? In theory after the in-progress calls fail there should not be leftover data to read or write. The AsyncIoStream is just a wrapper for a file descriptor. Does it matter for the code above that it was replaced? Or the only option here is to never let the CapnProto RPC system see the broken connection by using something like ZeroMQ sockets underneath?

Kenton Varda

unread,
Jul 11, 2018, 1:16:03 PM7/11/18
to bazu...@gmail.com, Cap'n Proto
On Wed, Jul 11, 2018 at 4:10 AM, <bazu...@gmail.com> wrote:
This makes sense in a general case but I'm using CapnProto for an IPC on a single device, do not need to load balance, etc.

Ohhh, this simplifies things considerably. For local IPC, you can absolutely rely on the kernel to tell you if the service crashes. You will always get a DISCONNECTED exception immediately. So you only need to catch those and retry. You don't need to use timeouts.

In the event of disconnect all capabilities are lost, all in-flight calls are dropped, etc. All RPC state is effectively lost. Makes sense. Am I right that the only handling strategy here is to connect again with NetworkAddress, recreate the TwoPartyClient from the returned AsyncIoStream and bootstrap the root capability again from the client?

Yes.
 
There are mentions of persistent capabilities and "sturdy refs" in docs and sources. But I've not figured out what is this exactly, how to use them and if it will be any help at all in this case.

SturdyRefs are sort of a "design pattern". It's not something built into the library, since implementing them requires deep knowledge of higher-level details. The idea is that you can call save() on a capability and receive back some sort of token that you can use to get that capability again in the future. But, what those tokens should look like and how exactly you restore them is not specified by Cap'n Proto.
 
To handle disconnects I need to attach "catch_" to every request send call which tries to reconnect (and bootstrap, etc.) on the error and repeat the call. I've tried to create a generic call wrapper function but there is a problem with it: it is impossible to store requests nor send the same request twice, so the only option is to capture the whole request generating code into a lambda and repeat that. I need to copy all the data into the lambda and rebuild the request every time before sending. Looks kind of awkward and inefficient. You can see an attempt in the code I've posted above. Maybe I've missed some lower level API which could help with that? Does the repeating the same request from the application level makes sense or I'm trying to do something silly here?

I generally recommend catching the DISCONNECTED exception somewhere higher-level in your program, not at every single call site. I think this is where your complexity is coming from. Try to think about what is the overall operation you are performing (which may consistent of a sequence of multiple calls). Add an exception handler to the overall operation which restarts the whole thing.

The error case should be rare, so you don't need to worry too much about it being efficient.


FWIW, here's a utility class from the Sandstorm codebase that helps with handling disconnects:


It creates a capability which proxies calls, but when any call fails with DISCONNECTED, it initiates a reconnect callback and blocks subsequent calls until the reconnect completes. However, note that it does not automatically retry the call which threw DISCONNECTED, since only the application knows what kinds of calls are safe to retry. So the app still needs to catch and retry, but at least the exception handler doesn't need to figure out for itself how to reconnect.

This utility should probably be moved to the Cap'n Proto library at some point. I think there's also room for some utility code for automatically retrying an operation on DISCONNECTED exceptions.

-Kenton

Kenton Varda

unread,
Jul 11, 2018, 1:22:50 PM7/11/18
to bazu...@gmail.com, Cap'n Proto
On Wed, Jul 11, 2018 at 10:13 AM, <bazu...@gmail.com> wrote:
Is there some way to restore broken connection without recreating all objects a new and loosing all the state? In theory after the in-progress calls fail there should not be leftover data to read or write. The AsyncIoStream is just a wrapper for a file descriptor. Does it matter for the code above that it was replaced? Or the only option here is to never let the CapnProto RPC system see the broken connection by using something like ZeroMQ sockets underneath?

The hard part is not restoring the connection, but rather restoring the server-side state. If your server crashed, then all the capabilities you had on the connection point to objects that no longer exist. You need to instruct the server on how to rebuild those. Hence you have to start fresh.

Since you said your use case is IPC, the only circumstance where you will get disconnected is if the server crashed. So, some sort of approach that allows restoring an existing session wouldn't help, because the server-side state is gone.

In theory you could design a proxy/membrane that records every call that returned a capability, so that it can replay them after disconnect in order to reconstruct the same capabilities. However, whether or not this actually works depends on the application -- in some cases, replaying the exact same call sequence may be the wrong thing to do. For example, say the first time you connected, you made a call to create a new file with some name. On subsequent connections, you want to re-open the existing file rather than create a new one. This is why it's not really possible for Cap'n Proto to automate restoring connections...

-Kenton

bazu...@gmail.com

unread,
Jul 12, 2018, 5:55:23 PM7/12/18
to Cap'n Proto
On Wednesday, 11 July 2018 19:16:03 UTC+2, Kenton Varda wrote:
On Wed, Jul 11, 2018 at 4:10 AM, <bazu...@gmail.com> wrote:
This makes sense in a general case but I'm using CapnProto for an IPC on a single device, do not need to load balance, etc.

Ohhh, this simplifies things considerably. For local IPC, you can absolutely rely on the kernel to tell you if the service crashes. You will always get a DISCONNECTED exception immediately. So you only need to catch those and retry. You don't need to use timeouts.

Only for unix sockets though. For TCP the first syscall after the disconnect always succeeds and the RESET packet is received. Only the next syscall will return "connection reset". Luckily CapnProto RPC does at least 2 writeMessage to the socket (call and finish) and also tries to read responses so there are enough actual syscalls to detect the disconnect it seems.

 
There are mentions of persistent capabilities and "sturdy refs" in docs and sources. But I've not figured out what is this exactly, how to use them and if it will be any help at all in this case.

SturdyRefs are sort of a "design pattern". It's not something built into the library, since implementing them requires deep knowledge of higher-level details. The idea is that you can call save() on a capability and receive back some sort of token that you can use to get that capability again in the future. But, what those tokens should look like and how exactly you restore them is not specified by Cap'n Proto.

Thanks! This looks useful for security stuff but not a concern for me currently.
 
 
To handle disconnects I need to attach "catch_" to every request send call which tries to reconnect (and bootstrap, etc.) on the error and repeat the call. I've tried to create a generic call wrapper function but there is a problem with it: it is impossible to store requests nor send the same request twice, so the only option is to capture the whole request generating code into a lambda and repeat that. I need to copy all the data into the lambda and rebuild the request every time before sending. Looks kind of awkward and inefficient. You can see an attempt in the code I've posted above. Maybe I've missed some lower level API which could help with that? Does the repeating the same request from the application level makes sense or I'm trying to do something silly here?

I generally recommend catching the DISCONNECTED exception somewhere higher-level in your program, not at every single call site. I think this is where your complexity is coming from. Try to think about what is the overall operation you are performing (which may consistent of a sequence of multiple calls). Add an exception handler to the overall operation which restarts the whole thing.

Thanks for the suggestion! I've thought about this but my use case is very basic: I'm just gathering some data in one process and calling onSomeEvent(data) or onSomeOtherEvent(data) of the other. I have multiple processes which communicate like that and can potentially crash or restart at any time. So that I need to repeat requests I've not managed to think of something better than to wrap the whole request generation in a lambda so far. Maybe for this particular scenario RPC is an overkill but I thought it will be useful later. Better start with a simpler case. 
 

FWIW, here's a utility class from the Sandstorm codebase that helps with handling disconnects:


It creates a capability which proxies calls, but when any call fails with DISCONNECTED, it initiates a reconnect callback and blocks subsequent calls until the reconnect completes. However, note that it does not automatically retry the call which threw DISCONNECTED, since only the application knows what kinds of calls are safe to retry. So the app still needs to catch and retry, but at least the exception handler doesn't need to figure out for itself how to reconnect.

Thank you! This is much closer to the solution I've envisioned. It also demonstrates advanced capability APIs. I've tried to integrate this CapRedirector and having some problems.

I'm using it like this:

client = capnp::Capability::Client{kj::heap<CapRedirector>([this] () {
    return networkAddress->connect().then(
        [this] (kj::Own<kj::AsyncIoStream>&& stream) {
            KJ_DBG("connected");
            connection = mv(stream);
            twoPartyClient = kj::heap<capnp::TwoPartyClient>(*connection);
            return twoPartyClient->bootstrap();
        });
})}.castAs<typename T::Client>();

If the server was not restarted immediately after the call failed with the disconnected exception the next error will be "Connection refused" which is expected. But even after the server is started the next call also fails with the same "Connection refused" exception. So the first call after the restart is also lost. This is because the reconnect attempt is in the exception handler.

I've tried adapt this for my use with queuing capability and reconnecting but could not figure out how to use this CapRedirector's in the second constructor form for experiments. The main problem is that I have nothing to call setTarget on after I create a client from it because capability client's constructor takes Own<Server>&& so my old reference will become invalid. But I need to create the client instance to make requests. Can you advice?

Also I could not quite figure out what problem iteration count and dummy ping are solving.  
 
This utility should probably be moved to the Cap'n Proto library at some point. I think there's also room for some utility code for automatically retrying an operation on DISCONNECTED exceptions.

-Kenton
 
I've not really considered the passed capabilities or server side state. In my case repeating the last call is not a problem but in the general case you are absolutely right. Maybe solving it automatically is not feasible but some additional facilities in the library such as CapRedirector will definitely help.

Kenton Varda

unread,
Jul 14, 2018, 1:19:24 AM7/14/18
to bazu...@gmail.com, Cap'n Proto
On Thu, Jul 12, 2018 at 2:55 PM, <bazu...@gmail.com> wrote:
On Wednesday, 11 July 2018 19:16:03 UTC+2, Kenton Varda wrote:
On Wed, Jul 11, 2018 at 4:10 AM, <bazu...@gmail.com> wrote:
This makes sense in a general case but I'm using CapnProto for an IPC on a single device, do not need to load balance, etc.

Ohhh, this simplifies things considerably. For local IPC, you can absolutely rely on the kernel to tell you if the service crashes. You will always get a DISCONNECTED exception immediately. So you only need to catch those and retry. You don't need to use timeouts.

Only for unix sockets though. For TCP the first syscall after the disconnect always succeeds and the RESET packet is received. Only the next syscall will return "connection reset". Luckily CapnProto RPC does at least 2 writeMessage to the socket (call and finish) and also tries to read responses so there are enough actual syscalls to detect the disconnect it seems.

Eh? You should get a notification of disconnect without doing any new writes at all. When the other side crashes, the OS should proactively flag the socket as readable, and then reading should produce an error, which should cause Cap'n Proto to error out all pending RPCs. This should be the case even for TCP sockets.

Is that not what you're seeing? If not that is a bug. What OS are you using?

Thank you! This is much closer to the solution I've envisioned. It also demonstrates advanced capability APIs. I've tried to integrate this CapRedirector and having some problems.

I'm using it like this:

client = capnp::Capability::Client{kj::heap<CapRedirector>([this] () {
    return networkAddress->connect().then(
        [this] (kj::Own<kj::AsyncIoStream>&& stream) {
            KJ_DBG("connected");
            connection = mv(stream);
            twoPartyClient = kj::heap<capnp::TwoPartyClient>(*connection);
            return twoPartyClient->bootstrap();
        });
})}.castAs<typename T::Client>();

If the server was not restarted immediately after the call failed with the disconnected exception the next error will be "Connection refused" which is expected. But even after the server is started the next call also fails with the same "Connection refused" exception. So the first call after the restart is also lost. This is because the reconnect attempt is in the exception handler.

You will need to design your reconnect callback such that it waits until the server is reachable. You could maybe use a loop with exponential fallback (after each failure, pause for twice as long as the previous pause).

CapRedirector doesn't do this internally because it depends on the use case -- e.g. in Sandstorm the reconnect callback is sometimes responsible for actually starting the destination server, not just connecting to it.

-Kenton

bazu...@gmail.com

unread,
Jul 19, 2018, 11:17:32 AM7/19/18
to Cap'n Proto
On Saturday, 14 July 2018 07:19:24 UTC+2, Kenton Varda wrote:
On Thu, Jul 12, 2018 at 2:55 PM, <bazu...@gmail.com> wrote:
Only for unix sockets though. For TCP the first syscall after the disconnect always succeeds and the RESET packet is received. Only the next syscall will return "connection reset". Luckily CapnProto RPC does at least 2 writeMessage to the socket (call and finish) and also tries to read responses so there are enough actual syscalls to detect the disconnect it seems.

Eh? You should get a notification of disconnect without doing any new writes at all. When the other side crashes, the OS should proactively flag the socket as readable, and then reading should produce an error, which should cause Cap'n Proto to error out all pending RPCs. This should be the case even for TCP sockets.

I was referring to my first attempt at using it without an RPC. I was just happily writing messages to socket hoping that I will receive an error on disconnect. After some researching I've figured now that watching it for 0 read seems to be the only way to detect if the other side is closed its end. Sorry for the confusion.
 
Thank you! This is much closer to the solution I've envisioned. It also demonstrates advanced capability APIs. I've tried to integrate this CapRedirector and having some problems.

I'm using it like this:

client = capnp::Capability::Client{kj::heap<CapRedirector>([this] () {
    return networkAddress->connect().then(
        [this] (kj::Own<kj::AsyncIoStream>&& stream) {
            KJ_DBG("connected");
            connection = mv(stream);
            twoPartyClient = kj::heap<capnp::TwoPartyClient>(*connection);
            return twoPartyClient->bootstrap();
        });
})}.castAs<typename T::Client>();

If the server was not restarted immediately after the call failed with the disconnected exception the next error will be "Connection refused" which is expected. But even after the server is started the next call also fails with the same "Connection refused" exception. So the first call after the restart is also lost. This is because the reconnect attempt is in the exception handler.

You will need to design your reconnect callback such that it waits until the server is reachable. You could maybe use a loop with exponential fallback (after each failure, pause for twice as long as the previous pause).

CapRedirector doesn't do this internally because it depends on the use case -- e.g. in Sandstorm the reconnect callback is sometimes responsible for actually starting the destination server, not just connecting to it.

Sorry for a stupid question but how could I wait in the reconnect? It actually runs inside the disconnect event callback. I'm could not find a simple fix, maybe I'm missing something obvious. I will try to write something similar to CapRedirector more fitting for my use case but I wanted to be sure that I understand how it works first.

Kenton Varda

unread,
Jul 19, 2018, 12:10:16 PM7/19/18
to bazu...@gmail.com, Cap'n Proto
On Thu, Jul 19, 2018 at 8:17 AM, <bazu...@gmail.com> wrote:
On Saturday, 14 July 2018 07:19:24 UTC+2, Kenton Varda wrote:
On Thu, Jul 12, 2018 at 2:55 PM, <bazu...@gmail.com> wrote:
Only for unix sockets though. For TCP the first syscall after the disconnect always succeeds and the RESET packet is received. Only the next syscall will return "connection reset". Luckily CapnProto RPC does at least 2 writeMessage to the socket (call and finish) and also tries to read responses so there are enough actual syscalls to detect the disconnect it seems.

Eh? You should get a notification of disconnect without doing any new writes at all. When the other side crashes, the OS should proactively flag the socket as readable, and then reading should produce an error, which should cause Cap'n Proto to error out all pending RPCs. This should be the case even for TCP sockets.

I was referring to my first attempt at using it without an RPC. I was just happily writing messages to socket hoping that I will receive an error on disconnect. After some researching I've figured now that watching it for 0 read seems to be the only way to detect if the other side is closed its end. Sorry for the confusion.

Yeah, I suppose you won't get proactive notification if you aren't waiting on any kind of event. At the kj::AsyncIoStream, you can do a read() with minBytes of 1. That will fail with a DISCONNECTED exception if the connection drops.
 
 
Thank you! This is much closer to the solution I've envisioned. It also demonstrates advanced capability APIs. I've tried to integrate this CapRedirector and having some problems.

I'm using it like this:

client = capnp::Capability::Client{kj::heap<CapRedirector>([this] () {
    return networkAddress->connect().then(
        [this] (kj::Own<kj::AsyncIoStream>&& stream) {
            KJ_DBG("connected");
            connection = mv(stream);
            twoPartyClient = kj::heap<capnp::TwoPartyClient>(*connection);
            return twoPartyClient->bootstrap();
        });
})}.castAs<typename T::Client>();

If the server was not restarted immediately after the call failed with the disconnected exception the next error will be "Connection refused" which is expected. But even after the server is started the next call also fails with the same "Connection refused" exception. So the first call after the restart is also lost. This is because the reconnect attempt is in the exception handler.

You will need to design your reconnect callback such that it waits until the server is reachable. You could maybe use a loop with exponential fallback (after each failure, pause for twice as long as the previous pause).

CapRedirector doesn't do this internally because it depends on the use case -- e.g. in Sandstorm the reconnect callback is sometimes responsible for actually starting the destination server, not just connecting to it.

Sorry for a stupid question but how could I wait in the reconnect? It actually runs inside the disconnect event callback. I'm could not find a simple fix, maybe I'm missing something obvious. I will try to write something similar to CapRedirector more fitting for my use case but I wanted to be sure that I understand how it works first.

You need to do a loop of trying to connect and, if it fails, wait some time, probably with exponential fall-off.

Something like (not tested):

    kj::Promise<kj::Own<kj::AsyncIoStream>> keepTryingToConnect(
        kj::Timer& timer, kj::NeworkAddress& addr, kj::Duration pauseTime = 100 * kj::MILLISECONDS) {
      return kj::evalNow([&]() { return addr.connect(); })
          .catch_([&timer, &addr, pauseTime](kj::Exception&& e)
              -> kj::Promise<kj::Own<kj::AsyncIoStream>> {
        if (e.getType() == kj::Exception::Type::DISCONNECTED) {
          return timer.afterDelay(pauseTime).then([&timer, &addr, pauseTime]() {
            return keepTryingToConnect(timer, addr, pauseTime * 2):
          });
        } else {
          return kj::mv(e);
        }
      });
    }

-Kenton

bazu...@gmail.com

unread,
Jul 19, 2018, 5:26:24 PM7/19/18
to Cap'n Proto
On Thursday, 19 July 2018 18:10:16 UTC+2, Kenton Varda wrote:
Sorry for a stupid question but how could I wait in the reconnect? It actually runs inside the disconnect event callback. I'm could not find a simple fix, maybe I'm missing something obvious. I will try to write something similar to CapRedirector more fitting for my use case but I wanted to be sure that I understand how it works first.

You need to do a loop of trying to connect and, if it fails, wait some time, probably with exponential fall-off.

Something like (not tested):

    kj::Promise<kj::Own<kj::AsyncIoStream>> keepTryingToConnect(
        kj::Timer& timer, kj::NeworkAddress& addr, kj::Duration pauseTime = 100 * kj::MILLISECONDS) {
      return kj::evalNow([&]() { return addr.connect(); })
          .catch_([&timer, &addr, pauseTime](kj::Exception&& e)
              -> kj::Promise<kj::Own<kj::AsyncIoStream>> {
        if (e.getType() == kj::Exception::Type::DISCONNECTED) {
          return timer.afterDelay(pauseTime).then([&timer, &addr, pauseTime]() {
            return keepTryingToConnect(timer, addr, pauseTime * 2):
          });
        } else {
          return kj::mv(e);
        }
      });
    }

Thanks! I also thought that it might be something similar but it is not going to work because "connect" returns a promise of AsyncIoStream which is then used to construct TwoPartyClient and bootstrap. I do not see how this could be done from inside another event (during the dispatchCall) without a trip to the event loop.

So, while experimenting I came up with the following:

#include <capnp/capability.h>
#include <kj/async.h>
#include <kj/debug.h>
#include <kj/function.h>

class CapReconnector : public capnp::Capability::Server,
                       public kj::Refcounted
{
public:
    CapReconnector(kj::Function<capnp::Capability::Client()> reconnect);

    kj::Promise<void> dispatchCall(
        uint64_t interfaceId, uint16_t methodId,
        capnp::CallContext<capnp::AnyPointer, capnp::AnyPointer> context) override;

private:
    bool connected;
    capnp::Capability::Client client;
    kj::Function<capnp::Capability::Client()> reconnect;
};

CapReconnector::CapReconnector(kj::Function<capnp::Capability::Client()> reconnect) :
    connected{false}, client{reconnect()}, reconnect{kj::mv(reconnect)}
{ }

kj::Promise<void> CapReconnector::dispatchCall(
    uint64_t interfaceId, uint16_t methodId,
    capnp::CallContext<capnp::AnyPointer, capnp::AnyPointer> context)
{
    if (!connected) {
        client = reconnect();
    }

    capnp::AnyPointer::Reader params = context.getParams();
    auto req = client.typelessRequest(interfaceId, methodId, params.targetSize());
    req.set(params);

    return req.send().then(
        [this, context] (auto&& response) mutable -> kj::Promise<void> {
            connected = true;
            context.initResults(response.targetSize()).set(response);
            return kj::READY_NOW;
        },
        [this] (kj::Exception&& e) -> kj::Promise<void> {
            if (e.getType() == kj::Exception::Type::DISCONNECTED) {
                connected = false;
            }
            return kj::mv(e);
        });
}

Looks nice and simple but works only as long as there is a wait after every call. If there are multiple pending calls in flight (detached promises for example) and the server disconnected it will try to call reconnect for all of them. Still might be useful as an example for someone. There is a QueuedClient hidden behind one of the capability client constructors which will probably help solving this problem. At least there is a progress.
Reply all
Reply to author
Forward
0 new messages