[C++] Fastest way to deserialize multiple messages in a file

1,189 views
Skip to first unread message

Farid Zakaria

unread,
Jul 20, 2017, 5:11:04 PM7/20/17
to Cap'n Proto
Looking for some guidance on possibly the fastest way to read multiple messages in a file that has multiple MessageRoots

So far I have this written:

capnp::MallocMessageBuilder messageBuilder;
//What is a good size for our words? As long as its smaller?
capnp::word scratch[1024];
kj::ArrayPtr<capnp::word> scratchSpace(scratch);
kj::FdInputStream stream(fd);
kj::BufferedInputStreamWrapper buff(stream);

unsigned long count = 0;
while (buff.tryGetReadBuffer().size() != 0) {
capnp::InputStreamMessageReader message(buff, capnp::ReaderOptions(), scratchSpace);
Message::Reader chunk = message.getRoot<Message>();
count++
;
}

I tried the helper methods first, but they seem too slow I think without the BufferedInputStreamWrapper.

I don't do much C++ so I appreciate any help :)
Btw, Is there an IRC chat or something ?

Kenton Varda

unread,
Jul 20, 2017, 5:42:44 PM7/20/17
to Farid Zakaria, Cap'n Proto
Hi Farid,

Try using mmap() (disclaimer: haven't tried compiling this):

struct stat stats;
KJ_SYSCALL(fstat(fd, &stats));
size_t size = stats.st_size;
const void* data = mmap(nullptr, size, PROT_READ, MAP_PRIVATE, fd, 0);
if (data == MAP_FAILED) {
  KJ_FAIL_SYSCALL("mmap");
}
KJ_DEFER(KJ_SYSCALL(munmap(data, size)) { break; });
KJ_SYSCALL(madvise(data, size, MADV_SEQUENTIAL));

kj::ArrayPtr<capnp::word> words(
    reinterpret_cast<const capnp::word*>(data),
    size / sizeof(capnp::word));

while (words.size() > 0) {
  capnp::FlatArrayMessageReader message(words);
  Message::Reader chunk = message.getRoot<Message>();
  count++;
  words = kj::arrayPtr(message.getEnd(), words.end());
}

(On Windows you'll need to use CreateFileMapping() and MapViewOfFile() or whatever.)

-Kenton

--
You received this message because you are subscribed to the Google Groups "Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email to capnproto+unsubscribe@googlegroups.com.
Visit this group at https://groups.google.com/group/capnproto.

Farid Zakaria

unread,
Jul 20, 2017, 6:40:29 PM7/20/17
to Cap'n Proto, farid.m...@gmail.com
Is MMAP the only way to randomly seek to an offset in the file?
I can't seem to find a way to do that with kj::FdInputStream ?

I'm trying to create an index of the elements in the file.
To unsubscribe from this group and stop receiving emails from it, send an email to capnproto+...@googlegroups.com.

Farid Zakaria

unread,
Jul 20, 2017, 7:14:16 PM7/20/17
to Cap'n Proto, farid.m...@gmail.com
One more question =)

I need to copy the root from a FdStream to a vector
Do I need to copy it into a MallocMessageBuilder ?


while (buff.tryGetReadBuffer().size() != 0) {
capnp::InputStreamMessageReader message(buff, capnp::ReaderOptions()); //, scratch);
Message::Reader message = message.getRoot<Message>();

capnp::MallocMessageBuilder messageBuilder;
messageBuilder.setRoot(chunk);
messages
->at(index++) = messageBuilder.getRoot<Message>();
}

Kenton Varda

unread,
Jul 20, 2017, 7:35:29 PM7/20/17
to Farid Zakaria, Cap'n Proto
On Thu, Jul 20, 2017 at 3:40 PM, Farid Zakaria <farid.m...@gmail.com> wrote:
Is MMAP the only way to randomly seek to an offset in the file?
I can't seem to find a way to do that with kj::FdInputStream ?

I'm trying to create an index of the elements in the file.

kj::InputStream doesn't assume the stream is seekable and doesn't track the current location. You could create a custom wrapper around InputStream or around BufferedInputStream that remembers how many bytes have been read. You can also lseek() the underlying fd directly, though of course you'll have to discard any buffers after that.

But indeed, if you use mmap() this will all be a lot easier, and faster. I highly recommend using mmap() here.

On Thu, Jul 20, 2017 at 4:14 PM, Farid Zakaria <farid.m...@gmail.com> wrote:
One more question =)

I need to copy the root from a FdStream to a vector
Do I need to copy it into a MallocMessageBuilder ?

With InputStreamMessageReader, yes. You have to destroy the InputStreamMessageReader before you can read the next message, and that invalidates the root Reader and all other Readers pointing into it.

However, with the mmap strategy, you don't need to delete the FlatArrayMessageReader before reading the next message. So, you can allocate them on the heap and put them into your vector, and then all the Readers pointing into them remain valid, as long as the FlatArrayMessageReaders exist and the memory is still mapped. (In this case you should remove the madvise() line since you plan to go back and randomly access the data later.)

Again, I *highly* recommend this strategy instead of using a stream. With the mmap strategy, not only do you avoid copying into a builder, but you avoid copying the underlying data when you read it. The operating system causes the memory addresses to point directly at its in-memory cache of the file data. If multiple programs mmap() the same file, they share the memory, rather than creating their own copies. Moreover, the operating system is free to evict the data from memory and then load it again later on-demand. There are tons of advantages to this approach and it is exactly what Cap'n Proto is designed to enable.

-Kenton

Farid Zakaria

unread,
Jul 20, 2017, 8:25:00 PM7/20/17
to Cap'n Proto, farid.m...@gmail.com
All the items in my message array seem to be always pointing to the last item read.
I'm not sure what I'm doing wrong here.

auto messages = std::make_unique<std::deque<Message::Reader *> >(10);
while (words.size() > 0) {
capnp::FlatArrayMessageReader * reader = new capnp::FlatArrayMessageReader(words);
Message::Reader message = reader->getRoot<Message>();
words =
kj::arrayPtr(message->getEnd(), words.end());
messages
->at(index++) = & message;
}

Farid Zakaria

unread,
Jul 20, 2017, 8:30:30 PM7/20/17
to Cap'n Proto, farid.m...@gmail.com
I had to actually store the FlatArrayMessageReader rather than the  Message::Reader for it to work ?
I think i'm not grokking why that matters -- I thought FlatArrayMessageReader is just a pointer into the MMAP file.
Why would it matter if it cast it to the reader ?


hmm.

Farid Zakaria

unread,
Jul 20, 2017, 8:44:51 PM7/20/17
to Cap'n Proto, farid.m...@gmail.com
Finally (sorry I keep making separate messages) -- 

The reason why I was seeking a FdInputStream solution is because it seems to be much faster than an MMAP solution.
Although my file is quite large (10GB) -- memory is not much of a concern.

How does one copy from InputStreamMessageReader into the MallocMessageReader ?

Kenton Varda

unread,
Jul 20, 2017, 11:03:33 PM7/20/17
to Farid Zakaria, Cap'n Proto
On Thu, Jul 20, 2017 at 5:25 PM, Farid Zakaria <farid.m...@gmail.com> wrote:
All the items in my message array seem to be always pointing to the last item read.
I'm not sure what I'm doing wrong here.

auto messages = std::make_unique<std::deque<Message::Reader *> >(10);
while (words.size() > 0) {
capnp::FlatArrayMessageReader * reader = new capnp::FlatArrayMessageReader(words);
Message::Reader message = reader->getRoot<Message>();
words =
kj::arrayPtr(message->getEnd(), words.end());
messages
->at(index++) = & message;
}
There are multiple problems with this code. They are C++ usage errors, not specifically Cap'n Proto related.

    messages->at(index++) = & message;

First, on this line you are taking the address of a temporary stack object (`message`). That object then goes out-of-scope, so this pointer is no longer valid. But you are storing the pointer in a long-lived object. You should make your deque contain instances of `Message::Reader`, not `Message::Reader*`.

Second, on this same line, there's no guarantee that `index` is a valid index into your deque. It looks like you're allocating a 10-element deque but if there are more than 10 messages you're running off the end of the deque.

Third, though it wouldn't prevent the code from functioning, it has a memory leak:

    capnp::FlatArrayMessageReader * reader = new capnp::FlatArrayMessageReader(words);

You aren't ever deleting this object that you created with `new`.

-Kenton
 

On Thursday, July 20, 2017 at 4:35:29 PM UTC-7, Kenton Varda wrote:
On Thu, Jul 20, 2017 at 3:40 PM, Farid Zakaria <farid.m...@gmail.com> wrote:
Is MMAP the only way to randomly seek to an offset in the file?
I can't seem to find a way to do that with kj::FdInputStream ?

I'm trying to create an index of the elements in the file.

kj::InputStream doesn't assume the stream is seekable and doesn't track the current location. You could create a custom wrapper around InputStream or around BufferedInputStream that remembers how many bytes have been read. You can also lseek() the underlying fd directly, though of course you'll have to discard any buffers after that.

But indeed, if you use mmap() this will all be a lot easier, and faster. I highly recommend using mmap() here.

On Thu, Jul 20, 2017 at 4:14 PM, Farid Zakaria <farid.m...@gmail.com> wrote:
One more question =)

I need to copy the root from a FdStream to a vector
Do I need to copy it into a MallocMessageBuilder ?

With InputStreamMessageReader, yes. You have to destroy the InputStreamMessageReader before you can read the next message, and that invalidates the root Reader and all other Readers pointing into it.

However, with the mmap strategy, you don't need to delete the FlatArrayMessageReader before reading the next message. So, you can allocate them on the heap and put them into your vector, and then all the Readers pointing into them remain valid, as long as the FlatArrayMessageReaders exist and the memory is still mapped. (In this case you should remove the madvise() line since you plan to go back and randomly access the data later.)

Again, I *highly* recommend this strategy instead of using a stream. With the mmap strategy, not only do you avoid copying into a builder, but you avoid copying the underlying data when you read it. The operating system causes the memory addresses to point directly at its in-memory cache of the file data. If multiple programs mmap() the same file, they share the memory, rather than creating their own copies. Moreover, the operating system is free to evict the data from memory and then load it again later on-demand. There are tons of advantages to this approach and it is exactly what Cap'n Proto is designed to enable.

-Kenton

--
You received this message because you are subscribed to the Google Groups "Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email to capnproto+unsubscribe@googlegroups.com.

Kenton Varda

unread,
Jul 20, 2017, 11:06:41 PM7/20/17
to Farid Zakaria, Cap'n Proto
On Thu, Jul 20, 2017 at 5:44 PM, Farid Zakaria <farid.m...@gmail.com> wrote:
Finally (sorry I keep making separate messages) -- 

The reason why I was seeking a FdInputStream solution is because it seems to be much faster than an MMAP solution.
Although my file is quite large (10GB) -- memory is not much of a concern.

This is very surprising. Can you show your complete code that is faster with InputStreamMessageReader than with mmap()? Probably there is a problem in the code that causes the difference.

How does one copy from InputStreamMessageReader into the MallocMessageReader ?

I assume you mean MallocMessageBuilder. You would do:

    bulider.setRoot(reader.getRoot<Type>());

-Kenton
 

--
You received this message because you are subscribed to the Google Groups "Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email to capnproto+unsubscribe@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages