Actually, I think this is two hand-wavy. Also I think the original inclusive join is actually correct because I want to ensure that both sides finish any I/O that may be in flight. Otherwise I may end the stream prematurely just because 1 end finished (e.g. 1 end sends some data & then closes because its done - the peer won't receive all the data).
My current code looks something like:
void
completelyClose(kj::AsyncIoStream& stream) {
stream.shutdownWrite();
stream.abortRead();
};
kj::Canceler stillRunning;
auto
stream1
= ioContext.lowLevelProvider->wrapSocketFd(
rawSocket, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
stillRunning.wrap(listener->accept().then([&](kj::Own<kj::AsyncIoStream>&& stream2) mutable {
auto paf = kj::newPromiseAndFulfiller<void>();
auto unsafeStream2 = stream2.get();
kj::Vector<kj::Promise<void>> pumped;
pumped.add(stream1->pumpTo(stream2)
.ignoreResult()
.then(
[stream2 = stream2.get()] { stream2->shutdownWrite(); },
[&stream1, stream2 = stream2.get()] (kj::Exception&& e) {
completelyClose(*stream1);
completelyClose(*stream2);
});
pumped.add(unsafeStream2->pumpTo(stream1)
.ignoreResult()
.then(
[&stream1] { stream1->shutdownWrite(); },
[&stream1, stream2 = unsafeStream2] (kj::Exception&& e) {
completelyClose(*stream1);
completelyClose(*stream2);
}));
return kj::joinPromises(pumped.releaseAsArray())
.attach(
paf.promise.fork(),
// AcceptedConnection simply fulfills on destruction.
kj::heap<AcceptedConnection>(kj::mv(stream2), kj::mv(paf.fulfiller)),
);
})).wait(waitScope);
The fulfiller stuff is another place I'm pretty sure I haven't done right. I was just going off of what's happening under the hood when you wait on the promise that TwoPartyServer returns when it listens.