--
You received this message because you are subscribed to the Google Groups "Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email to capnproto+unsubscribe@googlegroups.com.
Visit this group at https://groups.google.com/group/capnproto.
To unsubscribe from this group and stop receiving emails from it, send an email to capnproto+...@googlegroups.com.
To unsubscribe from this group and stop receiving emails from it, send an email to capnproto+unsubscribe@googlegroups.com.
#include <ipctest.capnp.h>
#include <capnp/rpc-twoparty.h>
#include <chrono>#include <thread>
using namespace std;using namespace capnp;using namespace kj;
struct IpcClient{ AsyncIoContext ioContext; Own<NetworkAddress> networkAddress; Own<AsyncIoStream> connection; Own<TwoPartyClient> rpcClient; IpcTest::Client client; bool connected;
IpcClient(StringPtr address) : ioContext{setupAsyncIo()}, client{nullptr}, connected{false} { networkAddress = ioContext.provider->getNetwork() .parseAddress(address).wait(ioContext.waitScope); }
Promise<void> connect() { return networkAddress->connect().then( [this] (Own<AsyncIoStream>&& stream) -> Promise<void> { connected = true; connection = mv(stream); rpcClient = heap<TwoPartyClient>(*connection); client = rpcClient->bootstrap().castAs<IpcTest>(); return READY_NOW; }); }
Promise<bool> reconnect(int tries = 1) { if (connected) return true; if (tries-- > 0) { return connect().then( [=] () -> Promise<bool> { return true; }, [=] (Exception&& ex) -> Promise<bool> { KJ_DBG("connect failed", tries, ex); if (tries >= 0) { return ioContext.provider->getTimer() .afterDelay(1*SECONDS) .then([=] { return reconnect(tries); }); } return false; }); } return false; }
template<typename Sender> Promise<void> call(Sender makeRequest, int retries = 1) { return reconnect(1).then([=] (bool reconnected) -> Promise<void> { if (!reconnected) return READY_NOW; return ioContext.provider->getTimer() .timeoutAfter(2*SECONDS, makeRequest()) .catch_([=] (Exception&& ex) -> Promise<void> { KJ_DBG("call failed", retries, ex); if (ex.getType() == Exception::Type::DISCONNECTED) { connected = false; return ioContext.provider->getTimer().afterDelay(3*SECONDS) .then([=] { return reconnect(3).then( [=] (bool reconnected) mutable -> Promise<void> { if (reconnected && retries-- > 0) return call(makeRequest, retries); else return READY_NOW; }); }); } return READY_NOW; }); }); }
Promise<void> echo(int value) { return call([=] { auto request = client.echoRequest(); request.setValue(value); return request.send().then( [] (Response<IpcTest::EchoResults>&& response) -> Promise<void> { auto value = response.getValue(); KJ_DBG("echo response", value); return READY_NOW; }); }); }};
int main(){ StringPtr address{"127.0.0.1:3456"}; IpcClient ipc{address}; auto& ioContext = ipc.ioContext;
// ipc.connect().wait(ioContext.waitScope);
for (int i = 0; i < 5; i++) { KJ_DBG("sending", i); ipc.echo(i) .then([&] { return ioContext.provider->getTimer()
.afterDelay(5*SECONDS); }) .wait(ioContext.waitScope); }
return 0;}// The RPC implementation sits on top of an implementation of `VatNetwork`. The `VatNetwork`// determines how to form connections between vats -- specifically, two-way, private, reliable,// sequenced datagram connections. The RPC implementation determines how to use such connections// to manage object references and make method calls.Does the "reliable" here means that the connection state tracking, retries, etc. are supposed to be handled on the layer below RPC by something like ZeroMQ (implemented by custom VatNetwork subclass) or it is about lower level requirements such as packets within message are not reordered and such?
Actually for many use cases it will probably much simpler to just write some serialized data to ZeroMQ REQREP socket but full RPC looks nice and convenient it is just unclear for me how to use it in "production" setup. Is everyone who uses CapnProto RPC has some higher level layer above it with state tracking, transaction semantics and such and maybe also custom VanNetwork implementations or I am missing something obvious here? Sorry for silly questions, I just really like this library and trying figure out how to get the most out of it. :)
On Thu, Jul 5, 2018 at 1:24 PM, <bazu...@gmail.com> wrote:Actually for many use cases it will probably much simpler to just write some serialized data to ZeroMQ REQREP socket but full RPC looks nice and convenient it is just unclear for me how to use it in "production" setup. Is everyone who uses CapnProto RPC has some higher level layer above it with state tracking, transaction semantics and such and maybe also custom VanNetwork implementations or I am missing something obvious here? Sorry for silly questions, I just really like this library and trying figure out how to get the most out of it. :)I think you're expecting something here that isn't really possible. TCP is already a reliable transport -- as reliable as it can be. It already re-transmits packets after a timeout. Adding timeouts and retries on top of TCP will not make your transport more reliable.
It only makes sense to try to handle errors if there is some application-specific strategy you can use that TCP itself couldn't have. Cap'n Proto can't do this for you, because it depends on your app. Are there multiple servers you could be connecting to? If so, then it might make sense to try a different server if the first one times out. But Cap'n Proto doesn't know about your app's replications strategy, so it can't do this automatically. Retrying to the same server after a timeout probably isn't a good idea -- it will only "dig the hole deeper", overloading that server.
I find it's usually easiest to handle errors at a high level rather than a low level. Usually, it's much more obvious what to do about a problem at a higher level. In fact, I find in the vast majority of situations, the right way to handle an error is simply to abort and log the error. You can then monitor your logs and decide if error rates are too high in some area, and then focus on implementing better error handling strategies there.
In the event of disconnect all capabilities are lost, all in-flight calls are dropped, etc. All RPC state is effectively lost. Makes sense. Am I right that the only handling strategy here is to connect again with NetworkAddress, recreate the TwoPartyClient from the returned AsyncIoStream and bootstrap the root capability again from the client? There are mentions of persistent capabilities and "sturdy refs" in docs and sources. But I've not figured out what is this exactly, how to use them and if it will be any help at all in this case.To handle disconnects I need to attach "catch_" to every request send call which tries to reconnect (and bootstrap, etc.) on the error and repeat the call. I've tried to create a generic call wrapper function but there is a problem with it: it is impossible to store requests nor send the same request twice, so the only option is to capture the whole request generating code into a lambda and repeat that. I need to copy all the data into the lambda and rebuild the request every time before sending. Looks kind of awkward and inefficient. You can see an attempt in the code I've posted above. Maybe I've missed some lower level API which could help with that? Does the repeating the same request from the application level makes sense or I'm trying to do something silly here?
This makes sense in a general case but I'm using CapnProto for an IPC on a single device, do not need to load balance, etc.
In the event of disconnect all capabilities are lost, all in-flight calls are dropped, etc. All RPC state is effectively lost. Makes sense. Am I right that the only handling strategy here is to connect again with NetworkAddress, recreate the TwoPartyClient from the returned AsyncIoStream and bootstrap the root capability again from the client?
There are mentions of persistent capabilities and "sturdy refs" in docs and sources. But I've not figured out what is this exactly, how to use them and if it will be any help at all in this case.
To handle disconnects I need to attach "catch_" to every request send call which tries to reconnect (and bootstrap, etc.) on the error and repeat the call. I've tried to create a generic call wrapper function but there is a problem with it: it is impossible to store requests nor send the same request twice, so the only option is to capture the whole request generating code into a lambda and repeat that. I need to copy all the data into the lambda and rebuild the request every time before sending. Looks kind of awkward and inefficient. You can see an attempt in the code I've posted above. Maybe I've missed some lower level API which could help with that? Does the repeating the same request from the application level makes sense or I'm trying to do something silly here?
Is there some way to restore broken connection without recreating all objects a new and loosing all the state? In theory after the in-progress calls fail there should not be leftover data to read or write. The AsyncIoStream is just a wrapper for a file descriptor. Does it matter for the code above that it was replaced? Or the only option here is to never let the CapnProto RPC system see the broken connection by using something like ZeroMQ sockets underneath?
On Wed, Jul 11, 2018 at 4:10 AM, <bazu...@gmail.com> wrote:This makes sense in a general case but I'm using CapnProto for an IPC on a single device, do not need to load balance, etc.Ohhh, this simplifies things considerably. For local IPC, you can absolutely rely on the kernel to tell you if the service crashes. You will always get a DISCONNECTED exception immediately. So you only need to catch those and retry. You don't need to use timeouts.
There are mentions of persistent capabilities and "sturdy refs" in docs and sources. But I've not figured out what is this exactly, how to use them and if it will be any help at all in this case.SturdyRefs are sort of a "design pattern". It's not something built into the library, since implementing them requires deep knowledge of higher-level details. The idea is that you can call save() on a capability and receive back some sort of token that you can use to get that capability again in the future. But, what those tokens should look like and how exactly you restore them is not specified by Cap'n Proto.
To handle disconnects I need to attach "catch_" to every request send call which tries to reconnect (and bootstrap, etc.) on the error and repeat the call. I've tried to create a generic call wrapper function but there is a problem with it: it is impossible to store requests nor send the same request twice, so the only option is to capture the whole request generating code into a lambda and repeat that. I need to copy all the data into the lambda and rebuild the request every time before sending. Looks kind of awkward and inefficient. You can see an attempt in the code I've posted above. Maybe I've missed some lower level API which could help with that? Does the repeating the same request from the application level makes sense or I'm trying to do something silly here?I generally recommend catching the DISCONNECTED exception somewhere higher-level in your program, not at every single call site. I think this is where your complexity is coming from. Try to think about what is the overall operation you are performing (which may consistent of a sequence of multiple calls). Add an exception handler to the overall operation which restarts the whole thing.
FWIW, here's a utility class from the Sandstorm codebase that helps with handling disconnects:It creates a capability which proxies calls, but when any call fails with DISCONNECTED, it initiates a reconnect callback and blocks subsequent calls until the reconnect completes. However, note that it does not automatically retry the call which threw DISCONNECTED, since only the application knows what kinds of calls are safe to retry. So the app still needs to catch and retry, but at least the exception handler doesn't need to figure out for itself how to reconnect.
client = capnp::Capability::Client{kj::heap<CapRedirector>([this] () { return networkAddress->connect().then( [this] (kj::Own<kj::AsyncIoStream>&& stream) { KJ_DBG("connected"); connection = mv(stream); twoPartyClient = kj::heap<capnp::TwoPartyClient>(*connection); return twoPartyClient->bootstrap(); });})}.castAs<typename T::Client>();This utility should probably be moved to the Cap'n Proto library at some point. I think there's also room for some utility code for automatically retrying an operation on DISCONNECTED exceptions.-Kenton
On Wednesday, 11 July 2018 19:16:03 UTC+2, Kenton Varda wrote:On Wed, Jul 11, 2018 at 4:10 AM, <bazu...@gmail.com> wrote:This makes sense in a general case but I'm using CapnProto for an IPC on a single device, do not need to load balance, etc.Ohhh, this simplifies things considerably. For local IPC, you can absolutely rely on the kernel to tell you if the service crashes. You will always get a DISCONNECTED exception immediately. So you only need to catch those and retry. You don't need to use timeouts.Only for unix sockets though. For TCP the first syscall after the disconnect always succeeds and the RESET packet is received. Only the next syscall will return "connection reset". Luckily CapnProto RPC does at least 2 writeMessage to the socket (call and finish) and also tries to read responses so there are enough actual syscalls to detect the disconnect it seems.
Thank you! This is much closer to the solution I've envisioned. It also demonstrates advanced capability APIs. I've tried to integrate this CapRedirector and having some problems.I'm using it like this:client = capnp::Capability::Client{kj::heap<CapRedirector>([this] () {return networkAddress->connect().then([this] (kj::Own<kj::AsyncIoStream>&& stream) {KJ_DBG("connected");connection = mv(stream);twoPartyClient = kj::heap<capnp::TwoPartyClient>(*connection);return twoPartyClient->bootstrap();});})}.castAs<typename T::Client>();If the server was not restarted immediately after the call failed with the disconnected exception the next error will be "Connection refused" which is expected. But even after the server is started the next call also fails with the same "Connection refused" exception. So the first call after the restart is also lost. This is because the reconnect attempt is in the exception handler.
On Thu, Jul 12, 2018 at 2:55 PM, <bazu...@gmail.com> wrote:Only for unix sockets though. For TCP the first syscall after the disconnect always succeeds and the RESET packet is received. Only the next syscall will return "connection reset". Luckily CapnProto RPC does at least 2 writeMessage to the socket (call and finish) and also tries to read responses so there are enough actual syscalls to detect the disconnect it seems.Eh? You should get a notification of disconnect without doing any new writes at all. When the other side crashes, the OS should proactively flag the socket as readable, and then reading should produce an error, which should cause Cap'n Proto to error out all pending RPCs. This should be the case even for TCP sockets.
Thank you! This is much closer to the solution I've envisioned. It also demonstrates advanced capability APIs. I've tried to integrate this CapRedirector and having some problems.I'm using it like this:client = capnp::Capability::Client{kj::heap<CapRedirector>([this] () {return networkAddress->connect().then([this] (kj::Own<kj::AsyncIoStream>&& stream) {KJ_DBG("connected");connection = mv(stream);twoPartyClient = kj::heap<capnp::TwoPartyClient>(*connection);return twoPartyClient->bootstrap();});})}.castAs<typename T::Client>();If the server was not restarted immediately after the call failed with the disconnected exception the next error will be "Connection refused" which is expected. But even after the server is started the next call also fails with the same "Connection refused" exception. So the first call after the restart is also lost. This is because the reconnect attempt is in the exception handler.You will need to design your reconnect callback such that it waits until the server is reachable. You could maybe use a loop with exponential fallback (after each failure, pause for twice as long as the previous pause).CapRedirector doesn't do this internally because it depends on the use case -- e.g. in Sandstorm the reconnect callback is sometimes responsible for actually starting the destination server, not just connecting to it.
On Saturday, 14 July 2018 07:19:24 UTC+2, Kenton Varda wrote:On Thu, Jul 12, 2018 at 2:55 PM, <bazu...@gmail.com> wrote:Only for unix sockets though. For TCP the first syscall after the disconnect always succeeds and the RESET packet is received. Only the next syscall will return "connection reset". Luckily CapnProto RPC does at least 2 writeMessage to the socket (call and finish) and also tries to read responses so there are enough actual syscalls to detect the disconnect it seems.Eh? You should get a notification of disconnect without doing any new writes at all. When the other side crashes, the OS should proactively flag the socket as readable, and then reading should produce an error, which should cause Cap'n Proto to error out all pending RPCs. This should be the case even for TCP sockets.I was referring to my first attempt at using it without an RPC. I was just happily writing messages to socket hoping that I will receive an error on disconnect. After some researching I've figured now that watching it for 0 read seems to be the only way to detect if the other side is closed its end. Sorry for the confusion.
Thank you! This is much closer to the solution I've envisioned. It also demonstrates advanced capability APIs. I've tried to integrate this CapRedirector and having some problems.I'm using it like this:client = capnp::Capability::Client{kj::heap<CapRedirector>([this] () {return networkAddress->connect().then([this] (kj::Own<kj::AsyncIoStream>&& stream) {KJ_DBG("connected");connection = mv(stream);twoPartyClient = kj::heap<capnp::TwoPartyClient>(*connection);return twoPartyClient->bootstrap();});})}.castAs<typename T::Client>();If the server was not restarted immediately after the call failed with the disconnected exception the next error will be "Connection refused" which is expected. But even after the server is started the next call also fails with the same "Connection refused" exception. So the first call after the restart is also lost. This is because the reconnect attempt is in the exception handler.You will need to design your reconnect callback such that it waits until the server is reachable. You could maybe use a loop with exponential fallback (after each failure, pause for twice as long as the previous pause).CapRedirector doesn't do this internally because it depends on the use case -- e.g. in Sandstorm the reconnect callback is sometimes responsible for actually starting the destination server, not just connecting to it.Sorry for a stupid question but how could I wait in the reconnect? It actually runs inside the disconnect event callback. I'm could not find a simple fix, maybe I'm missing something obvious. I will try to write something similar to CapRedirector more fitting for my use case but I wanted to be sure that I understand how it works first.
Sorry for a stupid question but how could I wait in the reconnect? It actually runs inside the disconnect event callback. I'm could not find a simple fix, maybe I'm missing something obvious. I will try to write something similar to CapRedirector more fitting for my use case but I wanted to be sure that I understand how it works first.You need to do a loop of trying to connect and, if it fails, wait some time, probably with exponential fall-off.Something like (not tested):kj::Promise<kj::Own<kj::AsyncIoStream>> keepTryingToConnect(kj::Timer& timer, kj::NeworkAddress& addr, kj::Duration pauseTime = 100 * kj::MILLISECONDS) {return kj::evalNow([&]() { return addr.connect(); }).catch_([&timer, &addr, pauseTime](kj::Exception&& e)-> kj::Promise<kj::Own<kj::AsyncIoStream>> {if (e.getType() == kj::Exception::Type::DISCONNECTED) {return timer.afterDelay(pauseTime).then([&timer, &addr, pauseTime]() {return keepTryingToConnect(timer, addr, pauseTime * 2):});} else {return kj::mv(e);}});}
#include <capnp/capability.h>#include <kj/async.h>#include <kj/debug.h>#include <kj/function.h>
class CapReconnector : public capnp::Capability::Server, public kj::Refcounted{public: CapReconnector(kj::Function<capnp::Capability::Client()> reconnect);
kj::Promise<void> dispatchCall( uint64_t interfaceId, uint16_t methodId, capnp::CallContext<capnp::AnyPointer, capnp::AnyPointer> context) override;
private: bool connected; capnp::Capability::Client client; kj::Function<capnp::Capability::Client()> reconnect;};
CapReconnector::CapReconnector(kj::Function<capnp::Capability::Client()> reconnect) : connected{false}, client{reconnect()}, reconnect{kj::mv(reconnect)}{ }
kj::Promise<void> CapReconnector::dispatchCall( uint64_t interfaceId, uint16_t methodId, capnp::CallContext<capnp::AnyPointer, capnp::AnyPointer> context){ if (!connected) { client = reconnect(); }
capnp::AnyPointer::Reader params = context.getParams(); auto req = client.typelessRequest(interfaceId, methodId, params.targetSize()); req.set(params);
return req.send().then( [this, context] (auto&& response) mutable -> kj::Promise<void> { connected = true; context.initResults(response.targetSize()).set(response); return kj::READY_NOW; }, [this] (kj::Exception&& e) -> kj::Promise<void> { if (e.getType() == kj::Exception::Type::DISCONNECTED) { connected = false; } return kj::mv(e); });}