C++ Bidirectional Stream Back to Client

141 views
Skip to first unread message

Matt Miller

unread,
May 7, 2020, 5:54:10 PM5/7/20
to Cap'n Proto
I'm new to Cap'n Proto and was experimenting with bidirectional streams. At a high level, what I'd like to do is, in sequence:

1. Client streams some chunks to the server.
2. Client tells the server it is done with its stream.
3. Server streams back some chunks to the client.
4. Server tells the client it is done with its stream.

Based on a post from a few days ago, I have (1) and (2) working, but was trying to figure out how to do (3) and (4). (3) happens in response to (2).

I'm using this IDL for illustration:

interface StreamService {
  reqReply
@0 (replySvc :ReplyCallback) -> (reqSvc :RequestCallback);
 
 
interface RequestCallback {
    sendChunk
@0 (chunk :Text) -> stream;
   
done @1 ();
 
}
 
 
interface ReplyCallback {
    sendChunk
@0 (chunk :Text) -> stream;
   
done @1 ();
 
}
}

And the server is implemented as:

struct ReqSvcImpl final: public StreamService::RequestCallback::Server {

   
explicit ReqSvcImpl(StreamService::ReplyCallback::Client replySvc) :
        _replySvc
(replySvc) {}

    kj
::Promise<void>
    sendChunk
(SendChunkContext context) override
   
{
       
auto chunk = context.getParams().getChunk();
        std
::cout << "Received \"" << chunk.cStr() << "\"" << std::endl;
        _vec
.push_back(std::string(chunk.cStr()));
       
return kj::READY_NOW;
   
}

    kj
::Promise<void>
   
done(DoneContext context) override
   
{
        std
::cout << "done called:" << std::endl;
       
for (const auto str : _vec) {
            std
::cout << "  " << str << std::endl;
       
}

       
return kj::READY_NOW;
   
}

private:

   
StreamService::ReplyCallback::Client _replySvc;
    std
::vector<std::string> _vec;
};

struct StreamServiceImpl final: public StreamService::Server {
    kj
::Promise<void>
    reqReply
(ReqReplyContext context) override
   
{
       
auto replySvc = context.getParams().getReplySvc();
        context
.getResults().setReqSvc(kj::heap<ReqSvcImpl>(replySvc));
       
return kj::READY_NOW;
   
}
};

On the client, I followed the pattern mentioned on an earlier post using recursive promises to send the stream to the server:

struct ReqSvcImpl final: public StreamService::RequestCallback::Server {
   
// Similar to ReqSvcImpl on the server, with a sendChunk/done function that prints the chunks
};


static kj::Promise<void>
sendStrings
(StreamService::RequestCallback::Client stream, uint32_t strNum,
            uint32_t stopNum
)
{
   
if (strNum == stopNum) {
       
return stream.doneRequest().send().ignoreResult();
   
}

    std
::string str("Test " + std::to_string(strNum));
   
auto chunkReq = stream.sendChunkRequest();
    chunkReq
.setChunk(str.c_str());
   
return chunkReq.send().then([stream=kj::mv(stream),
                                strNum
, stopNum]() mutable {
           
return sendStrings(kj::mv(stream), strNum + 1, stopNum);
       
});
}


main
(int argc, const char* argv[])
...
   
auto req = svc.reqReplyRequest();
   
auto reqSvc = req.send().getReqSvc();
   
auto promise = sendStrings(reqSvc, 1, 11);
    promise
.wait(waitScope);

I'm trying to figure out what I should do/return in the server's ReqSvcImpl::done() function and what, if anything, I need to do different on the client to wait on the server's stream to be received and finish.

I'm assuming ReqSvcImpl::done() needs to return a different promise object, but not sure exactly what that promise object should look like or how it incorporates the sendChunkRequest/doneRequest calls back to the client.

Thanks,

Matt

Kenton Varda

unread,
May 11, 2020, 12:01:12 PM5/11/20
to Matt Miller, Cap'n Proto
Hi Matt,

There are really two design options here:

1. Have RequestCallback::done() not return until the whole reply has been streamed. In this case, you can have the RequestCallback::done() implementation invoke the recursive loop to stream data back to the client. This is also nice in that if an exception is thrown in the process, it'll propagate back to the client.

2. Have RequestCallback::done() return immediately, and then the server streams to the client asynchronously. In this case, the RequestCallback::done() implementation would still *start* the promise loop to stream the reply, but it would save the kj::Promise<void> off to the side (e.g. in a member variable) instead of returning it. As long as that promise stays alive, it'll keep running. The client will need to make sure not to destroy the RequestCallback::Client until all the reply data has been received, otherwise it might inadvertently cancel the reply stream.

A major problem with #2 is that exceptions don't go anywhere. If an exception is thrown, the server just stops streaming. The Promise goes into the rejected state, but since no one is waiting for it, no one ever learns about the exception. You *should* call .eagerlyEvaluate(errorHandler) on the promise to provide an error handler function that at least logs the error or something. But, that's still not great, since ideally the error would propagate to the client somehow. Hence I recommend design #1 instead.

Note that in either design, you could consider having the client pass ReplyCallback as a parameter to RequestCallback::done(), rather than to reqReply(). This works since the server doesn't start responding until the client has sent its whole request stream, as you said. This is mostly an aesthetic design choice but sometimes one way or the other might be easier to deal with in the code.

-Kenton

--
You received this message because you are subscribed to the Google Groups "Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email to capnproto+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/capnproto/ecceef56-1e16-4e41-b0e3-be1a882398a7%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages