Best practice to set up bidirectional streams in c++

91 views
Skip to first unread message

vitaliy.o...@gmail.com

unread,
Apr 29, 2020, 2:17:09 PM4/29/20
to Cap'n Proto
Can someone provide a short example of how to implement bi-directional streams with c++ and CapnProto? Thanks!

From my understanding the server to client direction should look something like this, please correct me if I'm wrong :)

interface PointStream {
# Define some point streaming interface.

next @0 (point :PointXYZI) -> stream;
# Server will call this to submit points.

done @1 ();
# Once the stream is done this will be called.
}

pointStream
@0 (callback :PointStream) -> ();
# Define a function to get the point stream.



// =======================================================================================
class PointStreamImpl : public Scan::PointStream::Server {
// An implementation of the PointStream interface wrapping next() and done().
// We're implementing this on the client side and will pass a reference to
// the server. The server will then be able to make calls back to the client.

public:
PointStreamImpl() : doneCalled(false) {}

kj::Promise<void> next(NextContext context) {
KJ_REQUIRE(!doneCalled, "called next() after done()");
auto point = context.getParams().getPoint();

                // ... do something with the point

return kj::READY_NOW;
}

kj::Promise<void> done(NextContext context) {
KJ_REQUIRE(!doneCalled, "can only call done() once");
doneCalled = true;
return kj::READY_NOW;
}

private:
bool doneCalled;
};

Kenton Varda

unread,
Apr 30, 2020, 3:30:00 PM4/30/20
to vitaliy.o...@gmail.com, Cap'n Proto
Hi Vitaliy,

This example isn't "short", but the HTTP-over-Capnp code does bidirectional streaming:


(And the corresponding .h / .c++ files.)

Unfortunately I don't think they'll work great as examples since the logic is pretty complicated to optimize for path shortening and a bunch of other things...

Anyway, what you've written so far is only one part of the solution but it looks right to me. Note that if "point" objects are small and you want high throughput, you might want to batch them (deliver a List(PointXYZI) for each call).

-Kenton

--
You received this message because you are subscribed to the Google Groups "Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email to capnproto+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/capnproto/543a14f3-216c-44d4-bfd6-4a6cf43f4a77%40googlegroups.com.

Vitaliy Ostapchuk

unread,
Apr 30, 2020, 4:50:02 PM4/30/20
to Cap'n Proto
Hey Kenton,

once again thanks for your help here! I already looked into the HTTP implementation, but as you said there is too much going on around it to clearly separate the stream code without all the wrappers and factories around it. As you already said I have just the receiving part here and struggling a bit to get the sender part working correctly.

kj::Promise<void> pointStream(PointStreamContext context)
{
// get the callback provided by client
auto params = context.getParams();
streamCallback = kj::mv(params.getCallback());

// call next() in a loop ??
// call done() at the end ??

return kj::READY_NOW;
}


If someone can help me out here it would mean a lot! :)

As for sending the data in bigger chunks, this of cause would make sense. This code if for testing the different candidates only so it does not matter for now. Anyway thanks for the advice of cause!

Best regards

Kenton Varda

unread,
Apr 30, 2020, 5:02:48 PM4/30/20
to Vitaliy Ostapchuk, Cap'n Proto
Hi Vitaliy,

The trick to loops with KJ promises is do them recursively, like:

    kj::Promise<void> loop(PointStream::Client stream) {
      auto req = stream.nextRequest();
      req.setPoint(...);
      return req.send().then([stream=kj::mv(stream)]() mutable {
        return loop(kj::mv(stream));
      });
    }

Hope that helps!

-Kenton

--
You received this message because you are subscribed to the Google Groups "Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email to capnproto+...@googlegroups.com.

Vitaliy Ostapchuk

unread,
May 2, 2020, 7:44:02 AM5/2/20
to Cap'n Proto
Hey Kenton,

thanks for the suggestion! I actually managed to get the stream working now (both recursive and iterative). However, I'm still not sure how to join the done() at the end the right way. Any suggestions whats the best way to do that without actually use wait?

Best regards
Vitaliy


Kenton Varda

unread,
May 5, 2020, 8:27:57 PM5/5/20
to Vitaliy Ostapchuk, Cap'n Proto
Hi Vitaliy,

It should just be a matter of adding an if/else, like:

    kj::Promise<void> loop(PointStream::Client stream) {
      if (isDone()) {
        return stream.doneRequest().send().ignoreResult();
      } else {
        auto req = stream.nextRequest();
        req.setPoint(...);
        return req.send().then([stream=kj::mv(stream)]() mutable {
          return loop(kj::mv(stream));
        });
      }
    }

You say you got streaming working "both recursive and iterative", but I'm actually not aware of a way to write this code without using recursion. How did you do it iteratively?

-Kenton

--
You received this message because you are subscribed to the Google Groups "Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email to capnproto+...@googlegroups.com.

Vitaliy Ostapchuk

unread,
May 8, 2020, 11:55:51 AM5/8/20
to Cap'n Proto
Hey Kenton,

this was exactly what I tried to do, but it does not work as intended I think. At least the done() call will never be received in my experimental approaches. Especially if it contains arguments like the size of the message it might be crucial to fix this.

For your interest regarding the iterative approach its pretty simple the reverted recursion. Basically, every recursion can be broken down to the iterative approach, it is not always as nice though, here it does not even make sense. However, you could imagine chaining the requests in a loop, the results are ignored here anyway:

auto it = points.begin();
while (it != points.end()) {
auto request = stream.nextRequest();
auto point = *it;
request.setPoint(...);
request.send();
it++;
}

I finished my research on the framework and am really happy with it! However, the biggest problem I stumbled on is the different state of the supported languages. We will definitely discuss it as an option, but not sure it already fulfills all the requirements we have. 
Anyway, great thanks for the fast responses and good luck on continue to push it!

Best regards
Vitaliy

Kenton Varda

unread,
May 11, 2020, 11:47:16 AM5/11/20
to Vitaliy Ostapchuk, Cap'n Proto
On Fri, May 8, 2020 at 10:55 AM Vitaliy Ostapchuk <vitaliy.o...@gmail.com> wrote:
Hey Kenton,

this was exactly what I tried to do, but it does not work as intended I think. At least the done() call will never be received in my experimental approaches. Especially if it contains arguments like the size of the message it might be crucial to fix this.

It sounds like something is wrong with your code, then. I'd have to see the code to figure out why it isn't working. Based on what you said below, though, I wonder if you are ignoring the promise returned by send() and therefore prematurely cancelling the "done" request? You always need to to wait for a result even if you don't plan to use the result for anything.
 
For your interest regarding the iterative approach its pretty simple the reverted recursion. Basically, every recursion can be broken down to the iterative approach, it is not always as nice though, here it does not even make sense. However, you could imagine chaining the requests in a loop, the results are ignored here anyway:

auto it = points.begin();
while (it != points.end()) {
auto request = stream.nextRequest();
auto point = *it;
request.setPoint(...);
request.send();
it++;
}

This does not do what you want. This will always send all of the stream requests all at once, bloating up the queue on the sending side which will cause subsequent requests on the connection to be delayed until the entire stream is delivered. In other words, you lose the flow control, which is the whole point of the new streaming feature.

To avoid this, you need to wait for the promise returned by send() to finish before starting the next request. Until we get async/await support (C++20), this can only be done with a recursive approach.

Also note that destroying a promise technically means you want to cancel it. Since you are ignoring the promise returned by send(), it gets destroyed immediately, which requests that the call be canceled. This causes a cancellation request to be sent to the remote side, which may or may not actually cancel the request depending on whether it has been delivered already and whether the application has opted into listening for cancellation.

-Kenton
Reply all
Reply to author
Forward
0 new messages