Event streaming with FlatBuffers

1,051 views
Skip to first unread message

Maxim Zaks

unread,
Jan 11, 2017, 12:35:10 PM1/11/17
to FlatBuffers
There was a discussion about if FlatBuffers is suitable for streaming.
I want to present how I use it. I am also very much open for suggestions.

So first of all the use case.
I want to record events in chronological order. And be able to store/send a vector of those events.
Let's say this is a simplified fbs:
table Event {
  time
: long;
  message
: string;
  type
: byte;
}

table
Log {
  events
: [Event];
}

root_type
Log;

Let's say I have a Logger, which carries a flat buffer builder.
On every event I add a new Event table to buffer and store it's returned offset in a list.
When I need to store or send the log I build the Log table from the stored offsets and finish the buffer.
Now if I want to stream the events. I have two options:
1. I send always a batch of X events or every X seconds. Here the Logger can give a Sender the flat buffer builder and the list and create a new one for itself. Sender can finish the buffer and send it on another thread and the Logger can start building up next batch immediately.
2. I send current states of the log every X seconds. In this case Logger would need to create a deep copy of flat buffer builder and the list and give those to the Sender. Sender acts as described in case (1) Logger keeps adding events to original builder and list.


Wouter van Oortmerssen

unread,
Jan 11, 2017, 2:50:07 PM1/11/17
to Maxim Zaks, FlatBuffers
Option 1 sounds a lot easier to me, and guaranteed good performance.

--
You received this message because you are subscribed to the Google Groups "FlatBuffers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to flatbuffers+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Maxim Zaks

unread,
Jan 12, 2017, 5:52:49 AM1/12/17
to FlatBuffers, maxim...@googlemail.com
This option seems also the simplest and most efficient to me.
One pitfall though, receiver would have to do more work if it needs to aggregate events by session.
But than again it is only problematic in real time monitoring scenario.
And there you probably do not want to send batches in the first place, but send event by event.
I guess for offline mode or a mode where a delay is tolerable, option 1 is the way to go.

Thanks
To unsubscribe from this group and stop receiving emails from it, send an email to flatbuffers...@googlegroups.com.

Piyush Mishra

unread,
Jan 12, 2017, 3:19:02 PM1/12/17
to FlatBuffers, maxim...@googlemail.com
Pardon me if this sounds too noob, because I am.

But unless one has fixed header stuff in the logs (which does not sound very common/can easily be represented by a field in the event itself), why not just prefix lengths and send that over?

If inconsistent behaviour across languages and implementation decisions are the problem, one can easily abstract them away with a helper lib that generates code.

I've been toying with that idea on and off at https://github.com/daakia for a while and would love to know the reason behind trying to make this with just flatbuffers?

At it's core, why have the extra layer of indirection?

Maxim Zaks

unread,
Jan 12, 2017, 6:00:21 PM1/12/17
to FlatBuffers, maxim...@googlemail.com
Are you familiar with the law of the instrument? 😂

OK now a more serious answer. There are following benefits I see for using Flatbuffers:
1. Structured log statements. The example I brought up is quite basic, but in reality I can define an event, which can be a much more complex/elaborate. Consuming such event is possible without text parsing. It is also possible to read flat buffer log files without loading the whole content into memory. It would be much slower but if we talking about hundreds of megabytes of data, you might want to trade speed for memory consumption.
2. Flat buffers has evolution strategy. This is the main selling point compared to having a hand written serialisation. Logs is something that you might store for a long time, it is very beneficial if latest reader can consume all versions of logs. This is something you get with flat buffers for free.

I am already using FlatBuffers for a WIP event logging tool. 
I am not streaming the events currently I am just recording the whole session and dump it to disk at the end of the session.
Which is ok for my current needs, but might be not sufficient in a more complex scenario.

If you are interested I made a screen cast showing how the tool is working:
(disclaimer, very early stage and the video was recorded for internal purposes so could be hard to understand)

To boil down the facts. I record 48,056 events which take up 2.3 MB so roughly 50 bytes per event.
This seems a lot, however the events are already indexed and grouped for fast querying.
This is how I can display over 48K events, have hyperlinks and scrub through them in a simple HTML page without any significant slowdowns.
The initial loading would be much slower if I would have to parse 48K events upfront.

mikkelfj

unread,
Jan 13, 2017, 2:16:29 AM1/13/17
to FlatBuffers, maxim...@googlemail.com
I can add how you could do this in flatcc. While this particular to C, other builders could use the same principle.

flatcc has a stack, so a vector of log events are below the current event being added. Whenever a log event is completed it is emitted to the final buffer storage and the offset is pushed to the internal builder stack. In this way there is no need to keep track of offsets because the builder does it for you.

Eventually you need to close the vector. At this point the flatcc builder will convert the offset on the stack to make them relative and if necessary do endian conversion. Then the vector is emitted to the buffer storage. After that you would normally close the buffer, but you could also have a higher level vector of vectors until it also grows large.

The emitter storage can be custom managed. By default it is a paged design so it implements queue that is filled back to front. Once the buffer is completed, the finalize operation copies all the pages into a user supplied buffer, or it allocates an aligned buffer, or it uses malloc, depending on the exact operation used.

The emitter can be modified to transmit pages over a network long before the vector of log events completes. The reciever will not be able to read the data, but can collect the pages and do the recombination when the entire buffer is received. This recombination is entirely mechanic - requiring no knowledge of flatbuffers - so the receiving code can implement this any way it likes, as long as the first page is stored last. The first and last page is not necessarily full and must also be handled. If flatbuffers had stored front to back, pages could have been streamed directly to disk in this design.

This means the overall space consumption locally can be limited to the size of the offset vectors you want to use.

Piyush Mishra

unread,
Jan 13, 2017, 10:54:57 AM1/13/17
to mikkelfj, FlatBuffers, Maxim Zaks
@Maxim, my question was related more towards the streaming side of things than on why use FB for "events" themselves.

I see great use for storing events, even if we ignore everything else, just the forwards compatibility bit is a super win on it's own.

@mikkel, In <typeinfo byte><length><fb event> <typeinfo byte><length><fb event> mode, things are by default storable directly to disk or if one wants to go fancy with them, one can build a vtable on the receiving end.

All I am really trying to ask is, why not keep FB till events and use some standardized code generation/ even existing MQs on top to handle transmission.

--
You received this message because you are subscribed to a topic in the Google Groups "FlatBuffers" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/flatbuffers/UDS973I4jUI/unsubscribe.
To unsubscribe from this group and all its topics, send an email to flatbuffers+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

mikkelfj

unread,
Jan 13, 2017, 11:13:09 AM1/13/17
to FlatBuffers, mik...@dvide.com, maxim...@googlemail.com
@mikkel, In <typeinfo byte><length><fb event> <typeinfo byte><length><fb event> mode, things are by default storable directly to disk or if one wants to go fancy with them, one can build a vtable on the receiving end.

Well, in the scenario I described, you can also just stored the data to disk and read them without the full buffer or even vector. You still need the vtable, but these are available as well. I didn't mention it but vtables are transmitted to at the back of the emit queue, so if you store those on a separate disk, or send them over the network, they are available immediately. This is actually designed that way for this reason.

Sending the vector adds the index structure, but striclty speaking, if you know how the data is being built, you can reconstruct it on the receiving end. It is, however, more work than just storing pages and recombining them.

A custom format as you propose doesn't really add anything - you just replace the vtable offset with a type field and length field which you don't need. If you don't add a separate index you can't easily navigate the data and need a linear scan over the data. This won't support binary searching. However, you can use the framing temporarily during transport:

For completeness of discussion, the emitter works with a virtual address which is a negative offset from end of emit queue, before the vtable starts, and the vtable are positive offsets (+1 to avoid a zero offset which indicates error). These virtual addresses are the ones translated when the vector is completed. But each table emit operation can also be framed as you suggest. then you can rebuild the vector without actually sending the vector. Unless you need realtime read access, it is much simpler to send vectors in batches and possible index these vectors in another vector. Even if the buffer isn't completed, you know only need to track the location of each vector in a receiver index. The vectors will use relative addressing.

Piyush Mishra

unread,
Jan 13, 2017, 1:25:02 PM1/13/17
to mikkelfj, FlatBuffers, Maxim Zaks
On Fri, Jan 13, 2017 at 9:43 PM, mikkelfj <mik...@dvide.com> wrote:
@mikkel, In <typeinfo byte><length><fb event> <typeinfo byte><length><fb event> mode, things are by default storable directly to disk or if one wants to go fancy with them, one can build a vtable on the receiving end.

Well, in the scenario I described, you can also just stored the data to disk and read them without the full buffer or even vector. You still need the vtable, but these are available as well. 
I didn't mention it but vtables are transmitted to at the back of the emit queue, so if you store those on a separate disk, or send them over the network, they are available immediately. This is actually designed that way for this reason.

Is this the "meta-vtable" that contains addresses of each individual event throughout history? 

A question about internals of fb. When you say vector, is each individual "event" packed back to back? If that is so, then one could update the meta-vtable and drop the interim vtable of each individual "batch" and store the whole batch of events to disk in one go.

That is pretty neat if it is so! \m/

A custom format as you propose doesn't really add anything - you just replace the vtable offset with a type field and length field which you don't need.

Yes I agree, in my fanboy-moment, I skipped thinking on that line. The addition is low overhead on the sending end. You can keep a dumb buffer on that end, leave all the indexing and compute for the receiving side, which typically has resources set aside for these tasks. Then there are other fancy tasks you can only do on the receiving end,
 
Use case: Multiple services logging events with some header/type info which can then be used to aggregate events and build a separate vtable for a vector of chronological events during a request/distributed job. At the same time, building the standard vtable that comes as a default "here's a list of all log events, in order, from Service A"

FB can be used here too, and would be really cool if the statement about packing vectors back to back is true, loses real realtime, but realtime is a relative concept anyway. With an adjustable small enough time window to fill the buffer, it's practically realtime.

But if that statement is not true, then it is practically the same as sending framed messages, with additional indirection and overhead on the sending end :( No one likes paying for logging/tracing stuff in their application code.

For completeness of discussion, the emitter works with a virtual address which is a negative offset from end of emit queue, before the vtable starts, and the vtable are positive offsets (+1 to avoid a zero offset which indicates error). These virtual addresses are the ones translated when the vector is completed. But each table emit operation can also be framed as you suggest. then you can rebuild the vector without actually sending the vector. Unless you need realtime read access, it is much simpler to send vectors in batches and possible index these vectors in another vector. Even if the buffer isn't completed, you know only need to track the location of each vector in a receiver index. The vectors will use relative addressing.

When you say, "rebuild the vector", it means you already have it. In that case, ignore everything I said. The kind of use cases I was talking about are more on the lines of "I have individual tables", how do I send them over?

Disclaimer: I am not familiar with C++/internals of fb.
Wherever I write vtable, I actually mean a lookup table/squint-eyes-index. "vtable" in fb might mean something else entirely. If that is so, sorry for the confusion.

mikkelfj

unread,
Jan 13, 2017, 2:23:20 PM1/13/17
to FlatBuffers, mik...@dvide.com, maxim...@googlemail.com

I didn't mention it but vtables are transmitted to at the back of the emit queue, so if you store those on a separate disk, or send them over the network, they are available immediately. This is actually designed that way for this reason.

Is this the "meta-vtable" that contains addresses of each individual event throughout history? 
No vtables are vtables specifiying what fields are present in a table, and where. On googles flatc they are placed earlier in the buffer if not already present. In flatcc they are clustered at the end for better CPU caching and for allowing ahead of time access in an organized way if you need to hack the streaming data. Clustering can be disabled and is automatically for nested buffers since they can't mix with the parent.


A question about internals of fb. When you say vector, is each individual "event" packed back to back? If that is so, then one could update the meta-vtable and drop the interim vtable of each individual "batch" and store the whole batch of events to disk in one go.
 
Well, sort of. There isn't truly any guarantees, and yet there is. Each call to the customizable emitter is guaranteed to be a complete flatbuffer object (string, vector, table, or struct if using struct as root buffers) so if you know what is being generated, you can make valid assumptions, or you can pass information out of band from the caller to the builder and the emitter, e.g. set a global type variable. The guarantee comes from the fact that emit happens when an object ends, and that all child elements will already have ended before that (i.e. later in the buffer, incl. clustered vtables). Either way, the emitter knows the exact location from buffer end (minus vtables) so you can store the this address in a separate index, or in memory. It is a virtual address because it is relative to a point in the buffer (virtual offset 0) where the buffer grows in both directions. But if you store vtables separately, you now it is relative to the end.

You still can read directly from disk because most file systems don't allow you to prepend data, so you risk a table crosses a page boundary. But with a bit of book keeping and max table sizes, you can make safe guesses and handle special cases in dedicated memory buffer.

Use case: Multiple services logging events with some header/type info which can then be used to aggregate events and build a separate vtable for a vector of chronological events during a request/distributed job. At the same time, building the standard vtable that comes as a default "here's a list of all log events, in order, from Service A"
 
Note: vtable is only for say which fields are used in table, it is not for vectors. vectors store tables as relative offsets from where the element is located in the buffer. But noticed that later on.


FB can be used here too, and would be really cool if the statement about packing vectors back to back is true, loses real realtime, but realtime is a relative concept anyway. With an adjustable small enough time window to fill the buffer, it's practically realtime.
 
Tables are packed with necessary padding for alignment. The format itself allows for large gabs, but children (except vtables) cannot be stored before the parent. (Had it been the opposite, on disk storage while building would be simpler). If, on the receiving end, you allocate a reasonably large buffer and store back to front with some space at the back left for new vtables, or likewise a disk file. If you run out of space, you rewrite to a larger buffer or file. I would create a more advance indexing system, but that is for extreme performance scenarios. Still, you can trivially read a buffer that is not finished. You need to take the pointer to a known table or vector in the buffer and cast it to corresponding flatbuffer type. At least you can do this in with flatcc in C. Then you have premature read access to all already emitted data.
 

But if that statement is not true, then it is practically the same as sending framed messages, with additional indirection and overhead on the sending end :( No one likes paying for logging/tracing stuff in their application code.

Actually, flatc added this some time ago, and flatcc also supports this since not so long ago. If you create each event as a buffer, you can ask for the buffer to be size prefixed. The you can simply stack each buffer in file front to back. But it takes up more space, and you can again only scan one buffer at a time. You cannot do binary searches as you can in a vector of events where a table has a keyed field. Note: flatcc does not support sorting vectors before the buffer is complete exactly because it supports this kind of streaming. So search still has to wait until you can mmap and sort the vector (there is a call for that).

When you say, "rebuild the vector", it means you already have it. In that case, ignore everything I said. The kind of use cases I was talking about are more on the lines of "I have individual tables", how do I send them over?
It does send the individual tables first, and internally tracks their location, later converts it into a vector if the user ask it to do so. In principle you can also create tables that are never referenced anywhere, because it is the users responsibility to make use of the reference returned when creating a table, e.g. using vec_push, but why not create and send the vector, or many small vectors in a hierachy?

In more detail: 
When a table (say an event) is complete, the builder and the emitter both have the virtual address. If the table is to be placed in a containing vector, then it can be pushed to the vector event_vec_push_start, add event data, event_vec_push_end. The end call then stores the virtual address in the builders internal stack. This isn't a proper vector yet. But eventually you call eventlist_end(), or something,  which indicates you are not going to be adding more tables. At this point the internal stack is rewritten so the virtual address (negative offset from vtable cluster start) becomes a proper relative offset from the vector element location. This relative is only known when the vector is completed. The next step is then to call the emitter with the stack content of the now valid vector. The emitter gets a virtual address for the vector start, but stores the vector as a now proper flatbuffer vector that can be accessed directly if all the pages are moved to a consequitive memory region, even if the buffer is still open. This consequitive region could also be on the other end of a network link.


Disclaimer: I am not familiar with C++/internals of fb.
Wherever I write vtable, I actually mean a lookup table/squint-eyes-index. "vtable" in fb might mean something else entirely. If that is so, sorry for the confusion.
Yes, I get it. 
Reply all
Reply to author
Forward
0 new messages