joinPromises for remote promises?

114 views
Skip to first unread message

Brian Taylor

unread,
Jan 12, 2015, 3:46:52 PM1/12/15
to capn...@googlegroups.com
I'm really new here. I could be going about his all wrong...

I have two RPC functions:

  summarize @0 (fields :List(Field)) -> (summary :Summary);
  field @1 (angle: Angle) -> (field :Field);

  struct Summary {
    value @0 :Float64;
  }

  struct Field {
  }

  struct Angle {
    value @0 :Float64;
  }


I'd like to dispatch many requests to field and then collect the results to pass to summarize. I'd like to take advantage of pipelining to allow the results from the calls to field never to have to be returned to the client if they aren't viewed.

My approach:

  auto request = server.summarizeRequest();
  auto fieldPromises = kj::heapArrayBuilder<capnp::RemotePromise<Server::Field>>(N);

  for(uint a = 0; a < N; ++a) {
    auto fieldRequest = server.fieldRequest();
    fieldRequest.getAngle().setValue((double)a);
    fieldPromises.add(fieldRequest.send().getField());
  }

  auto promiseFields = kj::joinPromises(fieldPromises.finish()); // no good. can't convert RemotePromise<_> to Promise<_>
  request.setFields(promiseFields);

  auto response = request.send().wait(waitScope);
  std::cout << "value = " << response.getSummary().getValue() << std::endl;

I'd welcome commentary on my approach in general. Ultimately I want to scatter these calls to field across multiple Clients and then gather the results via summarize to send back to the caller.

Thanks!

Kenton Varda

unread,
Jan 12, 2015, 6:07:38 PM1/12/15
to Brian Taylor, capnproto
Hi Brian,

First, note that at present pipelining only applies to capabilities (interface references). Unfortunately, since `Field` is a struct, you cannot currently pipeline it back. You can solve this by turning Field into an interface, perhaps with a get() method that returns the original struct. Also note that using pipelining does not prevent the server from returning data to the client, but simply allows the client to avoid waiting for that to happen before sending a new request to the server incorporating the results of the previous call. However, wrapping the data in an interface with a get() method will, of course, prevent the data from being returned (unless the client calls get() explicitly).

So, let's assume you turn `Field` into a capability. Then, the following applies:

A `capnp::RemotePromise<T>` is a combination of two things:
- A `kj::Promise<capnp::Response<T>>`, used for waiting on the full response.
- A `T::Pipeline`, used for expressing pipelined requests.

You can actually decompose it into both pieces with code like:

    kj::Promise<capnp::Response<FieldResults>> promise = kj::mv(remotePromise);
    FieldResults::Pipeline pipeline = kj::mv(remotePromise);

In your case, it sounds like you actually only care about the pipeline part. You don't need the promise part at all -- you actually don't even need to join the promises.

In fact, `fieldRequest.send().getField()` actually returns type `Field`, not a `RemotePromise` (again, assuming `Field` is an interface type; if it were a struct, you'd get Field::Pipeline). So, you can build an Array<Field> which you can then pass into the `summarize` request. The rest of the RemotePromise can then be discarded.

-Kenton

--
You received this message because you are subscribed to the Google Groups "Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email to capnproto+...@googlegroups.com.
Visit this group at http://groups.google.com/group/capnproto.

Brian Taylor

unread,
Jan 12, 2015, 8:48:29 PM1/12/15
to capn...@googlegroups.com, el....@gmail.com
Thank you Kenton! Your explanation clarified several important points for me.

I made field an interface instead of a struct which actually fits the semantics of what I'm trying to do better. Field effectively becomes a handle that you can call "get" on to look at the data it represents. This is ideal because what it represents in my case is often multiple GB of data that are just going to be pumped into the next operation. Now my gather operation looks like this and compiles cleanly:

  auto request = server.summarizeRequest();
  auto fields = request.initFields(N);

  for(uint a = 0; a < N; ++a) {
    auto fieldRequest = envserver.fieldRequest();
    fieldRequest.getAngle().setValue((double)a);
    fields.set(a, server.send().getField());
  }

  auto response = request.send().wait(waitScope);

  std::cout << "value = " << response.getSummary().getValue() << std::endl;


Reply all
Reply to author
Forward
0 new messages