On 09/09/13 05:43, Kenton Varda wrote:
> Great, so we can use 0mq's framing. I wonder if it makes sense to
> define this as the canonical way to send Cap'n Proto messages over
> 0mq, and maybe provide some reference code (which I imagine is not
> very long).
Here is something that does the job:
ZmqSender::ZmqSender(zmqpp::socket& s) :
s_(s)
{
}
// Send a message provided as a capnp segment list. Each segment is sent
as a separate zmq message part.
void ZmqSender::send(kj::ArrayPtr<kj::ArrayPtr<capnp::word const> const>
segments)
{
auto it = segments.begin();
auto i = segments.size();
assert(i != 0);
while (--i != 0)
{
s_.send_raw(reinterpret_cast<char const*>(&(*it)[0]),
it->size() * sizeof(capnp::word), zmqpp::socket::send_more);
++it;
}
s_.send_raw(reinterpret_cast<char const*>(&(*it)[0]), it->size() *
sizeof(capnp::word), zmqpp::socket::normal);
}
The receiver is a bit messy, due to a bit of impedance mismatch with the
zmq API (I have to unmarshal into a std::string). The check for a
mis-aligned string buffer is there because the standard doesn't
guarantee that a std::string buffer has any particular alignment, as far
as I know. Obviously, the ZmqReceiver instance must remain in scope
until after unmarshaling is complete.
class ZmqReceiver final : private util::NonCopyable
{
public:
ZmqReceiver(zmqpp::socket& s);
kj::ArrayPtr<kj::ArrayPtr<capnp::word const> const> receive();
private:
zmqpp::socket& s_;
std::vector<std::string> parts_;
std::vector<std::unique_ptr<capnp::word[]>> copied_parts_;
std::vector<kj::ArrayPtr<capnp::word const>> segments_;
};
ZmqReceiver::ZmqReceiver(zmqpp::socket& s) :
s_(s)
{
}
// Receive a message (as a single message or in parts) and convert to a
capnp segment list.
kj::ArrayPtr<kj::ArrayPtr<capnp::word const> const> ZmqReceiver::receive()
{
// Clear previously received content, if any.
parts_.clear();
copied_parts_.clear();
segments_.clear();
do
{
parts_.push_back(string());
string& str = parts_.back();
s_.receive(str);
assert(str.size() % sizeof(capnp::word) == 0); //
Received message must contain an integral number of words.
auto num_words = str.size() / sizeof(capnp::word);
char* buf = &str[0];
if (reinterpret_cast<uintptr_t>(buf) % sizeof(capnp::word) == 0)
{
// String buffer is word-aligned, point directly at the
start of the string.
segments_.push_back(kj::ArrayPtr<capnp::word
const>(reinterpret_cast<capnp::word const*>(buf), num_words));
}
else
{
// String buffer is not word-aligned, make a copy and point
at that.
unique_ptr<capnp::word[]> words(new capnp::word[num_words]);
memcpy(words.get(), buf, str.size());
segments_.push_back(kj::ArrayPtr<capnp::word
const>(&words[0], num_words));
copied_parts_.push_back(move(words));
}
}
while (s_.has_more_parts());
return kj::ArrayPtr<kj::ArrayPtr<capnp::word const>>(&segments_[0],
segments_.size());
}
Cheers,
Michi.