Convert boost::promise to kj::promise

383 views
Skip to first unread message

sanj...@gmail.com

unread,
Jan 9, 2016, 12:57:19 AM1/9/16
to Cap'n Proto

I need to convert a boost::promise to a kj::Promise so the functions being called can blend into the kj async framework.  I managed to get something working using the Promise Adapter framework in kj.   Can Kenton and others check this code ?  I have some open questions in the code below.

I can contribute a patch to capnproto if this class is of general use.


/*
 * g++ -g -std=c++11 kj_boost_promise.cpp -lkj-async -lkj -lboost_thread -lboost_system -lpthread -lrt
 */
#include <iostream>

#include <kj/async.h>
#include <kj/async-io.h>

#define BOOST_THREAD_PROVIDES_FUTURE
#define BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION
#include <boost/thread/future.hpp> // Executor
#include <boost/thread.hpp> // Executor

// boost::promise
// -> boost::future
//    -> kj::PromiseFulfiller
//       -> kj::Promise

using boost::async;
using kj::PromiseFulfiller;

/**
 * Questions
 * 1. How to handle boost::promise::set_exception
 * 2. Destructor of adapter
 * 3. Also handle Boost shared future?
 */

template <class T>
class BoostPromiseAdapter
{
public:
    typedef boost::future<T> BF;
    typedef boost::promise<T> BP;

    BoostPromiseAdapter(PromiseFulfiller<BF>& promiseFulfiller,
        BF bf)
        : promiseFulfiller_(promiseFulfiller)
        , bf_(std::move(bf))
    {
        bf_.then([&] (BF samef) {
            promiseFulfiller_.fulfill(std::move(samef));
 });
    }

    ~BoostPromiseAdapter() noexcept(false)
    {
        // ??
    }

    PromiseFulfiller<BF>& promiseFulfiller_;
    BF bf_;
};


template <class T>
kj::Promise<boost::future<T>> getAdapted(boost::promise<T>& bp)
{
    return kj::newAdaptedPromise<boost::future<T>,
        BoostPromiseAdapter<T>>(bp.get_future());
}

typedef boost::future<int> BF;typedef boost::promise<int> BP;

int main(int argc, char* argv[])
{
    boost::promise<int> bp;

    auto ioctx = kj::setupAsyncIo();

    auto kjp = getAdapted<BF::value_type>(bp);

    bp.set_value(7);
    //std::exception e;
    //bp.set_exception(make_exception_ptr(e));

    auto kjBoostPromise = kjp.wait(ioctx.waitScope);

    try {
        std::cout << kjBoostPromise.get() << std::endl;
    } catch(std::exception& e) {
        // Problem - the exception is not being caught here
        std::cout << "exception " << e.what() << std::endl;
    }
}


Kenton Varda

unread,
Jan 10, 2016, 11:20:57 PM1/10/16
to Sandeep Joshi, Cap'n Proto
Hi Sandeep,

I haven't used this Boost library so I could be missing things, but here are my thoughts:

First, note that Boost's futures are meant to cross threads whereas KJ's promise framework is not thread-aware. You'll need to make sure that boost's .then() calls the callback in the correct thread.

Regarding exceptions, your approach below is one possibility: basically, the promise resolves when the boost future is ready, but it's still up to the caller to call .get() which may throw an exception. If you'd rather have a boost future exception propagate directly into a KJ promise exception, you could do it by actually calling .get() inside the boost future callback:

       bf_.then([this] (BF samef) {
          promiseFulfiller_.rejectIfThrows([&]() {
            promiseFulfiller_.fulfill(samef.get());
          });
       });

Above, if get() throws, it'll be equivalent to calling promiseFulfiller_.reject().

In this approach, boost::future<T> would translate into kj::Promise<T>, *not* kj::Promise<boost::future<T>> -- this seems like an improvement.

Regarding the adapter's destructor: It should "cancel" the operation somehow, such that the callback passed to future.then() is never called. Boost futures mighht not support cancellation, though. You could work around this by documenting your BoostPromiseAdapter to say that users must not cancel the adapted promise (i.e. they must not destroy it until it completes). Otherwise, you might want to try a completely different approach: use kj::newPromiseFulfillerPair<T>() instead of PromiseAdapter. This gives you a promise/fulfiller pair which are analogous to Boost's future/promise pair (it is unfortunate that Boost uses the word "promise" differently from everyone else, ugh). You can then arrange to fulfill the KJ fulfiller when the Boost future is done, while returning the KJ promise separately. Destroying the KJ promise will simply cause the fulfiller to be ignored.

Regarding shared_future, I don't think it changes anything in particular. Did you have a specific question there?

-Kenton

--
You received this message because you are subscribed to the Google Groups "Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email to capnproto+...@googlegroups.com.
Visit this group at https://groups.google.com/group/capnproto.

Sandeep Joshi

unread,
Jan 11, 2016, 4:16:03 AM1/11/16
to Kenton Varda, Cap'n Proto
On Mon, Jan 11, 2016 at 9:50 AM, Kenton Varda <ken...@sandstorm.io> wrote:
Hi Sandeep,

I haven't used this Boost library so I could be missing things, but here are my thoughts:

First, note that Boost's futures are meant to cross threads whereas KJ's promise framework is not thread-aware. You'll need to make sure that boost's .then() calls the callback in the correct thread.

You are right.  I have to use boost::launch::deferred in my use-case.
 

Regarding exceptions, your approach below is one possibility: basically, the promise resolves when the boost future is ready, but it's still up to the caller to call .get() which may throw an exception. If you'd rather have a boost future exception propagate directly into a KJ promise exception, you could do it by actually calling .get() inside the boost future callback:

       bf_.then([this] (BF samef) {
          promiseFulfiller_.rejectIfThrows([&]() {
            promiseFulfiller_.fulfill(samef.get());
          });
       });

Above, if get() throws, it'll be equivalent to calling promiseFulfiller_.reject().

In this approach, boost::future<T> would translate into kj::Promise<T>, *not* kj::Promise<boost::future<T>> -- this seems like an improvement.

It certainly is an improvement !  I made the changes and they worked.
 

Regarding the adapter's destructor: It should "cancel" the operation somehow, such that the callback passed to future.then() is never called. Boost futures mighht not support cancellation, though. You could work around this by documenting your BoostPromiseAdapter to say that users must not cancel the adapted promise (i.e. they must not destroy it until it completes). Otherwise, you might want to try a completely different approach: use kj::newPromiseFulfillerPair<T>() instead of PromiseAdapter. This gives you a promise/fulfiller pair which are analogous to Boost's future/promise pair (it is unfortunate that Boost uses the word "promise" differently from everyone else, ugh). You can then arrange to fulfill the KJ fulfiller when the Boost future is done, while returning the KJ promise separately. Destroying the KJ promise will simply cause the fulfiller to be ignored.


Maybe I am missing something in your alternate approach, because the comment in kj/async.h says

"// Although this function is easier to use than `newAdaptedPromise()`, it has the serious drawback
// that there is no way to handle cancellation (i.e. detect when the Promise is discarded)."

https://github.com/sandstorm-io/capnproto/blob/master/c%2B%2B/src/kj/async.h#L473

Is this comment outdated ?

OTOH, the AdapterPromiseNode::fulfill() checks the waiting flag but I can't find who is setting it

https://github.com/sandstorm-io/capnproto/blob/master/c%2B%2B/src/kj/async-inl.h#L718


 

Regarding shared_future, I don't think it changes anything in particular. Did you have a specific question there?

No, it was an internal question which didn't require your attention :-)
 

Kenton Varda

unread,
Jan 11, 2016, 1:08:01 PM1/11/16
to Sandeep Joshi, Cap'n Proto
On Mon, Jan 11, 2016 at 1:16 AM, Sandeep Joshi <sanj...@gmail.com> wrote:
On Mon, Jan 11, 2016 at 9:50 AM, Kenton Varda <ken...@sandstorm.io> wrote:
Hi Sandeep,

I haven't used this Boost library so I could be missing things, but here are my thoughts:

First, note that Boost's futures are meant to cross threads whereas KJ's promise framework is not thread-aware. You'll need to make sure that boost's .then() calls the callback in the correct thread.

You are right.  I have to use boost::launch::deferred in my use-case.

Hmm, but this seems like it would require integrating KJ's event loop with Boost's event loop. Have you written such an integration?
 
Maybe I am missing something in your alternate approach, because the comment in kj/async.h says

"// Although this function is easier to use than `newAdaptedPromise()`, it has the serious drawback
// that there is no way to handle cancellation (i.e. detect when the Promise is discarded)."

https://github.com/sandstorm-io/capnproto/blob/master/c%2B%2B/src/kj/async.h#L473

Is this comment outdated ?

No, the comment is correct. This approach won't handle cancellation. However, it will at least allow the KJ Promise to be destroyed early without causing memory corruption. The effect is that the future will keep running but won't report the result anywhere. Unless boost futures or the app provide some way to handle cancellation, I think this is the best you can do.
 
OTOH, the AdapterPromiseNode::fulfill() checks the waiting flag but I can't find who is setting it

fulfill() and reject() set it. The purpose of the flag is to make sure only the first call to fulfill() or reject() is honored.

-Kenton

sanj...@gmail.com

unread,
Jan 14, 2016, 4:07:15 AM1/14/16
to Cap'n Proto, sanj...@gmail.com


On Monday, January 11, 2016 at 11:38:01 PM UTC+5:30, Kenton Varda wrote:
On Mon, Jan 11, 2016 at 1:16 AM, Sandeep Joshi <sanj...@gmail.com> wrote:
On Mon, Jan 11, 2016 at 9:50 AM, Kenton Varda <ken...@sandstorm.io> wrote:
Hi Sandeep,

I haven't used this Boost library so I could be missing things, but here are my thoughts:

First, note that Boost's futures are meant to cross threads whereas KJ's promise framework is not thread-aware. You'll need to make sure that boost's .then() calls the callback in the correct thread.

You are right.  I have to use boost::launch::deferred in my use-case.

Hmm, but this seems like it would require integrating KJ's event loop with Boost's event loop. Have you written such an integration?

Nope.  When you specify a deferred launch in Boost, the continuation gets executed in the current thread when future.get() is called.

In order to integrate such functionality with the kj event loop, I would have to capture all the outstanding boost promises in the current thread and feed it into my implementation of a kj::EventPort.  Is that more or less correct ?  I have seen the code you posted earlier for node-capnp
https://github.com/kentonv/node-capnp/blob/master/src/node-capnp/capnp.cc

Would it easier to put a mutex-type wrapper on KJ promises to make them thread-safe ?

sanj...@gmail.com

unread,
Jan 14, 2016, 6:16:33 AM1/14/16
to Cap'n Proto, sanj...@gmail.com


On Thursday, January 14, 2016 at 2:37:15 PM UTC+5:30, sanj...@gmail.com wrote:


On Monday, January 11, 2016 at 11:38:01 PM UTC+5:30, Kenton Varda wrote:
On Mon, Jan 11, 2016 at 1:16 AM, Sandeep Joshi <sanj...@gmail.com> wrote:
On Mon, Jan 11, 2016 at 9:50 AM, Kenton Varda <ken...@sandstorm.io> wrote:
Hi Sandeep,

I haven't used this Boost library so I could be missing things, but here are my thoughts:

First, note that Boost's futures are meant to cross threads whereas KJ's promise framework is not thread-aware. You'll need to make sure that boost's .then() calls the callback in the correct thread.

You are right.  I have to use boost::launch::deferred in my use-case.

Hmm, but this seems like it would require integrating KJ's event loop with Boost's event loop. Have you written such an integration?

Nope.  When you specify a deferred launch in Boost, the continuation gets executed in the current thread when future.get() is called.

In order to integrate such functionality with the kj event loop, I would have to capture all the outstanding boost promises in the current thread and feed it into my implementation of a kj::EventPort.  Is that more or less correct ?  I have seen the code you posted earlier for node-capnp
https://github.com/kentonv/node-capnp/blob/master/src/node-capnp/capnp.cc

Would it easier to put a mutex-type wrapper on KJ promises to make them thread-safe ?


Alternately, I could write to a pipe in the boost continuation (in any thread) and let the corresponding kj::evalLater() wait on the read end.
The promise returned from  evalLater() can be fed to the ioContext.
Would this work ?
If yes, is there a way I can avoid creating a pipe for every conversion from a boost::promise to kj::Promise ?




 

Kenton Varda

unread,
Jan 15, 2016, 5:19:54 PM1/15/16
to Sandeep Joshi, Cap'n Proto
On Thu, Jan 14, 2016 at 1:07 AM, <sanj...@gmail.com> wrote:
Nope.  When you specify a deferred launch in Boost, the continuation gets executed in the current thread when future.get() is called.

In order to integrate such functionality with the kj event loop, I would have to capture all the outstanding boost promises in the current thread and feed it into my implementation of a kj::EventPort.  Is that more or less correct ?  I have seen the code you posted earlier for node-capnp
https://github.com/kentonv/node-capnp/blob/master/src/node-capnp/capnp.cc

It's more like, you adapt the KJ event loop to sit on top of boost's, so that it knows how to push its own events through boost. You should follow what I did in node-capnp, yes.
 
Would it easier to put a mutex-type wrapper on KJ promises to make them thread-safe ?

It's trickier than that. When an event happens cross-thread, you need a way to wake up the receiving thread if it is currently asleep.

For now, the easiest thing to do is to create a pipe where the receiving thread is always waiting on the pipe and the sending thread sends a byte on the pipe whenever a new event is incoming. Or, you might be able to use Unix signals (but they're ugly), or if you're on Linux you might try an eventfd.

Longer-term, we plan to add cross-thread event queuing to KJ -- in a way that adds no overhead for people who don't want it. Note that mutex locking is expensive so we don't want to just throw mutexes everywhere. (Actually, an early version of the KJ event loop used lots of lockless data structures but the atomic ops turned out too expensive, so I gave up and went single-threaded for the initial release.)

Alternately, I could write to a pipe in the boost continuation (in any thread) and let the corresponding kj::evalLater() wait on the read end.
The promise returned from  evalLater() can be fed to the ioContext.
Would this work ?

For the pipe, doing read() inside evalLater() will block and stall the event loop. What you want to do instead is use AsyncInputStream:

    auto stream = ioContext.lowLevelProvider->wrapInputFd(pipeFds[0]);
    stream->read(buffer, size);  // returns a Promise<void>

Otherwise, yes, that works.
 
If yes, is there a way I can avoid creating a pipe for every conversion from a boost::promise to kj::Promise ?

You can have a single pipe you use for wakeup, and a separate event queue protected by a mutex which you check whenever you receive a byte on the pipe. Or you could write the actual event data directly through the pipe. Or maybe you could write a pointer through the pipe, so the receiving end can just grab the pointer and take ownership, although that can get awkward.

-Kenton

Sandeep Joshi

unread,
Jan 19, 2016, 7:19:42 AM1/19/16
to Kenton Varda, Cap'n Proto

I need to pick your brain a little more ;-)

I am able to connect the boost::future ==> unix pipe ==> kj::Promise, but I am having difficulty demultiplexing multiple promises whose values are being written to the same pipe, primarily because I don't know how to tie the kj::Promise returned from kj::evalLater() to the actual promise I want to wake up.  

Do I tie them with a PromiseFulfillerPair or another continuation ?

Can the kj::event loop repeatedly check in non-blocking fashion for the same promises until they are ready ?

Can I call kj::evalLater() within the lambda passed to evalLater() to re-register the function if there are multiple waiting promises ?

struct BoostToCapnp
{
   int pipefd_[2];
   int buffer;
};

kj::Promise<T> getAdapted(kj::AsyncIoContext& ioContext, boost::future<T>&& bf)
{
  auto obj = new BoostToCapnp(); // todo: allocate once

  bf.then(boost::launch::async,
    [obj, &ioContext] (boost::future<T> samef) {
      obj->buffer = samef.get();
      write(obj->pipefd_[1], &obj->buffer, sizeof(obj->buffer));
    });

  return kj::evalLater([obj, &ioContext]() {
      auto readEndOfPipe =
        ioContext.lowLevelProvider->wrapInputFd(obj->pipefd_[0]);
 
      return readEndOfPipe->read(&obj->buffer, sizeof(obj->buffer), sizeof(obj->buffer))
        .then([obj] (size_t n) {
          return obj->buffer;
        })
        .attach(kj::mv(readEndOfPipe));
    });
}

 

-Kenton

Kenton Varda

unread,
Jan 22, 2016, 8:55:36 PM1/22/16
to Sandeep Joshi, Cap'n Proto
On Tue, Jan 19, 2016 at 4:19 AM, Sandeep Joshi <sanj...@gmail.com> wrote:
I need to pick your brain a little more ;-)

I am able to connect the boost::future ==> unix pipe ==> kj::Promise, but I am having difficulty demultiplexing multiple promises whose values are being written to the same pipe, primarily because I don't know how to tie the kj::Promise returned from kj::evalLater() to the actual promise I want to wake up.  

Do I tie them with a PromiseFulfillerPair or another continuation ?

Yes, you'll probably want to use PromiseFulfillerPairs. I imagine you'd have something like a map<int, kj::Own<kj::PromiseFulfiller<T>>> assigning IDs to each fulfiller, and you'd write the ID through the pipe.
 
Can the kj::event loop repeatedly check in non-blocking fashion for the same promises until they are ready ?

Sorry, I don't understand what you mean here.
 
Can I call kj::evalLater() within the lambda passed to evalLater() to re-register the function if there are multiple waiting promises ?

Again not sure what you're suggesting here, but I would say that a polling busy loop is definitely not what you want.

-Kenton

Sandeep Joshi

unread,
Jan 24, 2016, 11:19:15 AM1/24/16
to Kenton Varda, Cap'n Proto

I guess I am able to articulate the solution but unable to actualize it, given the massively useful but equally intricate framework you have built ;-)

I want to do promise pipelining - return many promises (RemotePromise) from the server to the client and let it retrieve the results as and when they are ready.   But I don't understand how to bind the promise returned by kj::evalLater() with the promise which has been returned to the client.  The evalLater() is needed to read the pipe, but thats not the promise which will be returned to the client.  Thats the crux of the problem. 

Can you tell me if the following will work with capnp ?  
Do I have to call kj::evalLater() on every adapted promise ?
Ignore obvious memory leaks, but do analyze the code for capnproto leaks.

template <class T>
class BoostToCapnp
{
  public:

  std::map<int, kj::Own<kj::PromiseFulfiller<T>>> map_;
  int pipefd_[2];
  int idx_ = 0;

  explicit BoostToCapnp()
  {
    int err = pipe2(pipefd_, O_NONBLOCK);
    assert (!err);
  }

  virtual ~BoostToCapnp()
  {
    close(pipefd_[0]);
    close(pipefd_[1]);
  }
};

template <class T>
kj::Promise<int> getAdapted(kj::AsyncIoContext& ioContext, boost::future<T>&& bf)
{
  static BoostToCapnp<T> *obj = nullptr;
  if (!obj) { obj = new BoostToCapnp<T>(); }

  auto pair = kj::newPromiseAndFulfiller<T>();
  obj->map_.insert(std::make_pair(obj->idx_, kj::mv(pair.fulfiller)));

  int idx = obj->idx_ ++;

  bf.then([idx, &ioContext] (boost::future<T> samef) {
      int futureValue = samef.get();
      (void) futureValue; // TODO write later
      write(obj->pipefd_[1], &idx, sizeof(idx));
    });

  kj::evalLater([&ioContext]() {

      auto readEndOfPipe =
        ioContext.lowLevelProvider->wrapInputFd(obj->pipefd_[0]);

      int myidx;

      readEndOfPipe->read(&myidx, sizeof(int), sizeof(int))
        .then([myidx] (size_t n) {
          auto iter = obj->map_.find(myidx);
          iter->second->fulfill(1); // TODO use futureValue
          obj->map_.erase(iter);
        })
        .attach(kj::mv(readEndOfPipe));
    });

  return kj::mv(pair.promise);
}

-Sandeep

 

-Kenton

Kenton Varda

unread,
Jan 25, 2016, 1:01:52 AM1/25/16
to Sandeep Joshi, Cap'n Proto
On Sun, Jan 24, 2016 at 8:19 AM, Sandeep Joshi <sanj...@gmail.com> wrote:


On Sat, Jan 23, 2016 at 7:25 AM, Kenton Varda <ken...@sandstorm.io> wrote:
On Tue, Jan 19, 2016 at 4:19 AM, Sandeep Joshi <sanj...@gmail.com> wrote:
I need to pick your brain a little more ;-)

I am able to connect the boost::future ==> unix pipe ==> kj::Promise, but I am having difficulty demultiplexing multiple promises whose values are being written to the same pipe, primarily because I don't know how to tie the kj::Promise returned from kj::evalLater() to the actual promise I want to wake up.  

Do I tie them with a PromiseFulfillerPair or another continuation ?

Yes, you'll probably want to use PromiseFulfillerPairs. I imagine you'd have something like a map<int, kj::Own<kj::PromiseFulfiller<T>>> assigning IDs to each fulfiller, and you'd write the ID through the pipe.
 
Can the kj::event loop repeatedly check in non-blocking fashion for the same promises until they are ready ?

Sorry, I don't understand what you mean here.
 
Can I call kj::evalLater() within the lambda passed to evalLater() to re-register the function if there are multiple waiting promises ?

Again not sure what you're suggesting here, but I would say that a polling busy loop is definitely not what you want.


I guess I am able to articulate the solution but unable to actualize it, given the massively useful but equally intricate framework you have built ;-)

I want to do promise pipelining - return many promises (RemotePromise) from the server to the client and let it retrieve the results as and when they are ready.   But I don't understand how to bind the promise returned by kj::evalLater() with the promise which has been returned to the client.  The evalLater() is needed to read the pipe, but thats not the promise which will be returned to the client.  Thats the crux of the problem. 

This is tangential, but I don't think evalLater() in your code below is accomplishing anything in particular -- you could call read() immediately with the same effect.

That of course still leaves you with the problem of what to do with the promise returned by read().

What you need to do here is create a loop which repeatedly reads events and dispatches them. Asynchronous loops are accomplished using tail recursion:

    kj::Promise<void> readLoop(kj::AsyncIoStream& readEndOfPipe) {
      kj::Own<int> myidx = kj::heap<int>();
      int& myixdref = *myidx;

      return readEndOfPipe->read(&myixdref, sizeof(int), sizeof(int))
        .then([&readEndOfPipe, myidx = kj::mv(myidx)] (size_t n) {
          auto iter = obj->map_.find(*myixd);
          iter->second->fulfill(1); // TODO use futureValue
          obj->map_.erase(iter);

          // Recursive loop.
          return readLoop(readEndOfPipe);
        });
    }

Note that I converted myidx to a heap variable; you had it on the stack before, but the stack goes away as soon as the outer call returns, long before the inner callback.

Anyway, now somewhere (maybe your main function) you need to actually initiate the loop. There are two ways to do this:
1) Call readLoop() and add the returned promise to a kj::TaskSet, which you don't destroy until you want to shut down.
2) Call readLoop(), then call eagerlyEvaluate() on the returned promise, then store the Promise<void> returned by that off to the side, again only destroying it when you want to shut down, i.e.:

    // Destroy readTask will cancel the loop.
    kj::Promise<void> readTask = readLoop(pipe)
        .eagerlyEvaluate([](kj::Exception&& e) {
      KJ_LOG(ERROR, e);
    });

-Kenton
 

Sandeep Joshi

unread,
Jan 29, 2016, 3:07:06 AM1/29/16
to Kenton Varda, Cap'n Proto
I used the second option and it worked.  Thanks!

It so cool, I can combine multiple boost::promises on the server into a kj::Promise on the client (using kj::joinPromises) and then wait for it.


Reply all
Reply to author
Forward
0 new messages