Using NATS streaming for event sourcing with snapshots

1,653 views
Skip to first unread message

Tim Fox

unread,
Aug 31, 2016, 5:39:52 AM8/31/16
to nats
Hi Derek et al,

I am considering using NATS streaming server for event sourcing. In any particular stream there might be many billions of events and a total size of many terabytes and events. E.g. a log might contain all the add/remove events corresponding to users adding/removing items from their shopping baskets.

A couple of questions I have:

* Can the NATS streaming server work well with very large logs of this size?
* Does NATS streaming support any kind of snapshot functionality? With event sourcing a common pattern is to snapshot the state of an aggregate (a shopping basket in this case) every so often, so you don't have to reply billions of events from the beginning of time in order to load a particular aggregate (shopping basket)

Tim Fox

unread,
Aug 31, 2016, 5:42:23 AM8/31/16
to nats
s/reply/replay

Ivan Kozlovic

unread,
Aug 31, 2016, 11:01:07 AM8/31/16
to nats
Hi Tim,

Thank you for your interest on NATS Streaming.

In term of limitations, NATS Streaming - assuming you are going to use file based store - needs file descriptors for each channel (subject) since messages are stored to a file (more than one actually, but an active one), one for the subscriptions file on this channel. The number of channels will drive the number of needed file descriptors.

You can configure the Streaming server to a given maximum number of channels/messages/subscriptions.

As of now (but working on a change that should go in master soon), even the file based store keeps messages in memory, which would probably not be scalable for billions of messages. When the changes I am working on are merged, a small record per message will be kept in memory, and messages will be read from disk when needed to be delivered. Still, billions of messages probably mean quite a bit of memory.

To answer your questions, I first have to ask you few ;-)

- How many channels (subjects) do you plan to use? Is it one per user? If so, how many users (therefore channels)? See limitations above.
- Could you describe what a snapshot would look like in the server? Note that in NATS Streaming, consumers can start consuming from anywhere (based on message sequence and/or timestamp) in the channel, is that not enough?

Thanks,
Ivan.

Henrik Johansson

unread,
Aug 31, 2016, 1:29:11 PM8/31/16
to nats

Ifaik when snapshot is used in Eventsourcing context it is much too app specific to allow any sort of middleware or database do it automatically. If you know of any such implementations I wouldn't mind taking a look at it. ☺️


--
You received this message because you are subscribed to the Google Groups "nats" group.
To unsubscribe from this group and stop receiving emails from it, send an email to natsio+un...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Tim Fox

unread,
Aug 31, 2016, 2:22:09 PM8/31/16
to nats
Hi Ivan, 

replies inline


On Wednesday, 31 August 2016 16:01:07 UTC+1, Ivan Kozlovic wrote:
Hi Tim,

Thank you for your interest on NATS Streaming.

In term of limitations, NATS Streaming - assuming you are going to use file based store - needs file descriptors for each channel (subject)

I was a little confused about what a "channel" is. I couldn't see a definition in the docs, and the terms channel and subject seem to be used interchangeably. I'm assuming they're the same and roughly equivalent to a "topic", i.e. something that clients can publish to and subscribe from. If so, we wouldn't have a lot of channels (maybe a few hundred) but they might contain a lot of messages.
 
since messages are stored to a file (more than one actually, but an active one), one for the subscriptions file on this channel. The number of channels will drive the number of needed file descriptors.

You can configure the Streaming server to a given maximum number of channels/messages/subscriptions.

As of now (but working on a change that should go in master soon), even the file based store keeps messages in memory, which would probably not be scalable for billions of messages. When the changes I am working on are merged, a small record per message will be kept in memory, and messages will be read from disk when needed to be delivered. Still, billions of messages probably mean quite a bit of memory.

We will have a lot of messages, maybe 100s of billions, maybe trillions. I think we need a solution where there's nothing in memory on a per message basis.

 

To answer your questions, I first have to ask you few ;-)

- How many channels (subjects) do you plan to use? Is it one per user? If so, how many users (therefore channels)? See limitations above.
- Could you describe what a snapshot would look like in the server?

We need the ability to publish a message (that's the snapshot and is application defined) and somehow get a "bookmark" to that snapshot. The "bookmark" will allow us to later resume subscription from that bookmark without having to scan through all preceeding messages (inpracticle if there are many bilions), i.e. some capability to index into a position in the queue of messages.

Now, if we can get a sequence number of the message that we just published, then later ask to resume from that sequence number then maybe that will work. Is it possible to get the sequence number of a just published message?

Ivan Kozlovic

unread,
Aug 31, 2016, 3:49:46 PM8/31/16
to nats
Tim,


I was a little confused about what a "channel" is. I couldn't see a definition in the docs, and the terms channel and subject seem to be used interchangeably. I'm assuming they're the same and roughly equivalent to a "topic", i.e. something that clients can publish to and subscribe from. If so, we wouldn't have a lot of channels (maybe a few hundred) but they might contain a lot of messages.

Channels are subjects, but without wildcards (not supported in NATS Streaming). So yes, you can see them as topics clients can send to/receive from.

Just so we are clear, you can set the maximum number of channels, and subscribers and messages per channel, so say that you set the number of messages to a very high number, those messages are never going to be removed. NATS Streaming uses a message log per channel, messages are appended and the old ones are only discarded when the limit of messages is reached.

 
 
We will have a lot of messages, maybe 100s of billions, maybe trillions. I think we need a solution where there's nothing in memory on a per message basis.

I have to say that I have doubts on that number of messages in a single NATS Streaming server. I am not sure how an implementation could have 0 memory per message. How the server would lookup a message (based on a sequence number) if it does not know where this message is located? Scanning the file to find it would obviously not scale ;-)

We need the ability to publish a message (that's the snapshot and is application defined) and somehow get a "bookmark" to that snapshot. The "bookmark" will allow us to later resume subscription from that bookmark without having to scan through all preceeding messages (inpracticle if there are many bilions), i.e. some capability to index into a position in the queue of messages.

I am still confused about what that means.
 
Now, if we can get a sequence number of the message that we just published, then later ask to resume from that sequence number then maybe that will work. Is it possible to get the sequence number of a just published message?

No, the published message does not have the sequence number when leaving the client. The sequence is assigned by the server. We could(?) have added the sequence in the Ack response from server to client. However, when consuming, the received message does have a sequence number, timestamp, etc... This is from this angle that you could restart a consumer for a know sequence number/timestamp.

I would love for you to use NATS Streaming, but I want to make sure that this is a good fit for your project. Since I do not fully grasp what the context is, I am not able to really see if this could work or not, what approach I could suggest, etc... I would could elaborate, of better, want to contact us that would be great. If you want to contact us, please send an email to our community manager, Brian Flannery (br...@nats.io).

Thanks!

Tim Fox

unread,
Aug 31, 2016, 5:43:26 PM8/31/16
to nats


On Wednesday, 31 August 2016 20:49:46 UTC+1, Ivan Kozlovic wrote:
Tim,

I was a little confused about what a "channel" is. I couldn't see a definition in the docs, and the terms channel and subject seem to be used interchangeably. I'm assuming they're the same and roughly equivalent to a "topic", i.e. something that clients can publish to and subscribe from. If so, we wouldn't have a lot of channels (maybe a few hundred) but they might contain a lot of messages.

Channels are subjects, but without wildcards (not supported in NATS Streaming). So yes, you can see them as topics clients can send to/receive from.

Just so we are clear, you can set the maximum number of channels, and subscribers and messages per channel, so say that you set the number of messages to a very high number, those messages are never going to be removed. NATS Streaming uses a message log per channel, messages are appended and the old ones are only discarded when the limit of messages is reached.

 
 
We will have a lot of messages, maybe 100s of billions, maybe trillions. I think we need a solution where there's nothing in memory on a per message basis.

I have to say that I have doubts on that number of messages in a single NATS Streaming server. I am not sure how an implementation could have 0 memory per message. How the server would lookup a message (based on a sequence number) if it does not know where this message is located? Scanning the file to find it would obviously not scale ;-)

Using an index on disk (this is how relational databases can do fast lookups without having to store pointers in memory). Indexes are often implemented using structures such as b-trees on disk. Like an index in a phone book the b-tree allows you to quickly lookup the location in the phone book where the phone number is stored.
 

We need the ability to publish a message (that's the snapshot and is application defined) and somehow get a "bookmark" to that snapshot. The "bookmark" will allow us to later resume subscription from that bookmark without having to scan through all preceeding messages (inpracticle if there are many bilions), i.e. some capability to index into a position in the queue of messages.

I am still confused about what that means.

Imagine every message in a channel has an incrementing sequence number. At some point I publish a message with sequence number X. Some time later (maybe much later) I ask for the server to stream me all messages from X in that channel. X is the bookmark (or snapshot point)
 
 
Now, if we can get a sequence number of the message that we just published, then later ask to resume from that sequence number then maybe that will work. Is it possible to get the sequence number of a just published message?

No, the published message does not have the sequence number when leaving the client. The sequence is assigned by the server. We could(?) have added the sequence in the Ack response from server to client.

This would be necessary - otherwise the client won't know what the snapshot point is (the X in the previous example).

Henrik Johansson

unread,
Aug 31, 2016, 6:32:10 PM8/31/16
to nats

Can't you let each aggregate handle it's own snapshotting? Every X events or at certain time intervals whichever comes first the aggregate can just choose to save it's current state and be done?

Maybe I am missing something?


--

Tim Fox

unread,
Sep 1, 2016, 4:18:20 AM9/1/16
to nats
Sure, it's always the app that does the actual snapshotting. The issue is in getting to the last snapshot quickly when replaying without scanning through billions of messages.

Henrik Johansson

unread,
Sep 1, 2016, 6:46:30 AM9/1/16
to nats

But can't you use StartAtSequence?
That way you only have to track this number.

I am sure I am oversimplifying things but it  seems like it could work?

Tim Fox

unread,
Sep 1, 2016, 6:48:01 AM9/1/16
to nat...@googlegroups.com
>> But can't you use StartAtSequence? 

Well... this is what I suggested earlier in the thread, but you need to know the sequence number of when you published the snapshot and apparently that's not available.

To unsubscribe from this group and stop receiving emails from it, send an email to natsio+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to a topic in the Google Groups "nats" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/natsio/OetiXggcFHk/unsubscribe.
To unsubscribe from this group and all its topics, send an email to natsio+unsubscribe@googlegroups.com.

Henrik Johansson

unread,
Sep 1, 2016, 6:51:20 AM9/1/16
to nat...@googlegroups.com

Maybe not if the client maintains it's own state via consumption of its own events?


--
You received this message because you are subscribed to a topic in the Google Groups "nats" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/natsio/OetiXggcFHk/unsubscribe.
To unsubscribe from this group and all its topics, send an email to natsio+un...@googlegroups.com.

Ivan Kozlovic

unread,
Sep 2, 2016, 10:16:08 AM9/2/16
to nats
>> But can't you use StartAtSequence? 

Well... this is what I suggested earlier in the thread, but you need to know the sequence number of when you published the snapshot and apparently that's not available.

We are thinking of changing the API (and server) to be able to get the Sequence from the Publish(Async) call. The server could return the Sequence as part of the ACK to the publisher.

Also, you wrote:

>> Imagine every message in a channel has an incrementing sequence number. 

It actually does ;-)

>> At some point I publish a message with sequence number X. Some time later 
>> (maybe much later) I ask for the server to stream me all messages from X in 
>> that channel. X is the bookmark (or snapshot point)

This means that the source of the events (the publisher) would decide at one point that the event inserted in the stream is a snapshot or bookmark and would have to save that sequence number and/or communicate that sequence number to the consumer, so it can use StartAtSequence to start consuming from that point on. Did I get it right?

Thanks!

Tim Fox

unread,
Sep 2, 2016, 12:53:19 PM9/2/16
to nat...@googlegroups.com
>> This means that the source of the events (the publisher) would decide at one point that the event inserted in the stream is a snapshot or bookmark and would have to save that sequence number and/or communicate that sequence number to the consumer, so it can use StartAtSequence to start consuming from that point on. Did I get it right?

That's one way of doing it. But the trouble with that is that it puts the onus of remembering the bookmark on the client. It would be better if the server could remember that state.

I.e. if you could say to the server: "Bookmark channel X now", then some time later say "Please replay channel X from last bookmark"

Ivan Kozlovic

unread,
Sep 2, 2016, 1:05:38 PM9/2/16
to nats
I see. But would you have more than 1 bookmark per Channel? If so, then you move the problem to which bookmark you are asking the server to replay from? Say bookmark for channel X contains those sequences: 1234 - 45678 - 1747362. Say that those sequences are added anytime you would ask server to bookmark a channel, that would be the current sequence number for that channel at the time the bookmark request is received. Your consumer would then use StartAtBookmark(bookmark index), then you need to know which bookmark that is, no?

Tim Fox

unread,
Sep 3, 2016, 4:11:12 AM9/3/16
to nats
Yes, ideally book marks would be named, i.e. each channel would maintain a map of bookmark_name to position

Kenneth Lerman

unread,
Dec 26, 2016, 2:55:15 PM12/26/16
to nats
Hi Tim,

It looks like snapshotting would have to be a DIY effort, but did you ever get the answer to whether NATS streaming could handle (store) many billions of events?

Hi Ivan,

I've seen that there is a 2^32 limit on the number of subjects. Also that there is a need for one file descriptor (fd) per subject. Am I correct that the fd requirement is the number of subjects that are currently "in use" rather that the total number of subjects?

Where are the indices stored in the file based system? If in memory, that would seem to place a significant constraint on the number of events to be stored. How much memory is required for storage of event indices?

Thanks,

Ken

Kenneth Lerman

Ivan Kozlovic

unread,
Dec 26, 2016, 5:01:44 PM12/26/16
to nats
Kenneth
 
It looks like snapshotting would have to be a DIY effort, but did you ever get the answer to whether NATS streaming could handle (store) many billions of events?

There is no hard-coded limits, so it will depend on resources you have to your disposal. 
 
I've seen that there is a 2^32 limit on the number of subjects.

Not sure where you got that limit. The subjects aka channels, are stored in a map[string]. 
 
Also that there is a need for one file descriptor (fd) per subject.

For file based store, there is actually now minimum 2 (one for data file, one for index file). If you have subscribers on that channel, there will be 1 more for the subscribers file.
 
Am I correct that the fd requirement is the number of subjects that are currently "in use" rather that the total number of subjects?

No. The file store always maintain the last file slice (where messages are appended) of each channel opened, which again means 2 FDs (one for data, one for index). There may be need for more if a lookup occurs for a message that is not in the current file slice. Those, however, are closed after a short period of time. 
 

Where are the indices stored in the file based system? If in memory, that would seem to place a significant constraint on the number of events to be stored.

Correct. The file store maintains a map[uint64]*msgRecord. The key is the message sequence number. The value is a record that contains the offset on file, the timestamp and size of message. This is "needed" because the server may possibly need the timestamp when a subscriber start with a StartAt() with time/time delta. The message size is also recorded so that the file store can keep track of file slice overall size to satisfy file slice limits (and overall channel limits). 
 
How much memory is required for storage of event indices?

The msgRecord is 20 bytes, the key being 8 bytes, it means minimum 28 bytes per message. 

As we always said, the current basic file store implementation has many limits (FDs, memory, etc...) that may not be suitable for some users. We welcome other store implementations (we have a store interface to facilitate new implementations). The current implementation, though, offer pretty good performance for users that do not need to store that many messages and don't have that many channels.

Hope this helps!
Ivan.

Kenneth Lerman

unread,
Dec 26, 2016, 5:08:00 PM12/26/16
to nat...@googlegroups.com
Thanks for the prompt reply.

That helps a lot.

Regards,

Ken

Kenneth Lerman
55 Main Street
Newtown, CT 06470


On Mon, Dec 26, 2016 at 5:01 PM, Ivan Kozlovic <ivan.k...@apcera.com> wrote:
Kenneth
 
It looks like snapshotting would have to be a DIY effort, but did you ever get the answer to whether NATS streaming could handle (store) many billions of events?

There is no hard-coded limits, so it will depend on resources you have to your disposal. 
 
I've seen that there is a 2^32 limit on the number of subjects.

Not sure where you got that limit. The subjects aka channels, are stored in a map[string]. 

Does NATS impose any limits on the # of subjects?
The maximum number of subjects is currently 2^32 (i.e. the max value of Go’s uint32 type). This may change in the future. The current implementation (which predates some native Go data structures) is a custom Hashmap. We will eventually move to native Go data structures as we test and verify relative performance.

 
 
Also that there is a need for one file descriptor (fd) per subject.

For file based store, there is actually now minimum 2 (one for data file, one for index file). If you have subscribers on that channel, there will be 1 more for the subscribers file.
 
Am I correct that the fd requirement is the number of subjects that are currently "in use" rather that the total number of subjects?

No. The file store always maintain the last file slice (where messages are appended) of each channel opened, which again means 2 FDs (one for data, one for index). There may be need for more if a lookup occurs for a message that is not in the current file slice. Those, however, are closed after a short period of time. 
 

Where are the indices stored in the file based system? If in memory, that would seem to place a significant constraint on the number of events to be stored.

Correct. The file store maintains a map[uint64]*msgRecord. The key is the message sequence number. The value is a record that contains the offset on file, the timestamp and size of message. This is "needed" because the server may possibly need the timestamp when a subscriber start with a StartAt() with time/time delta. The message size is also recorded so that the file store can keep track of file slice overall size to satisfy file slice limits (and overall channel limits). 
 
How much memory is required for storage of event indices?

The msgRecord is 20 bytes, the key being 8 bytes, it means minimum 28 bytes per message. 

As we always said, the current basic file store implementation has many limits (FDs, memory, etc...) that may not be suitable for some users. We welcome other store implementations (we have a store interface to facilitate new implementations). The current implementation, though, offer pretty good performance for users that do not need to store that many messages and don't have that many channels.

Hope this helps!
Ivan.

--
You received this message because you are subscribed to the Google Groups "nats" group.
To unsubscribe from this group and stop receiving emails from it, send an email to natsio+unsubscribe@googlegroups.com.

Ivan Kozlovic

unread,
Dec 26, 2016, 5:16:50 PM12/26/16
to nats
Ken,

Does NATS impose any limits on the # of subjects?
The maximum number of subjects is currently 2^32 (i.e. the max value of Go’s uint32 type). This may change in the future. The current implementation (which predates some native Go data structures) is a custom Hashmap. We will eventually move to native Go data structures as we test and verify relative performance.

I see. But this section applies to NATS (not Streaming) and since gnatsd v0.8.0 (which was released in May 2016), this is no longer the case. We will need to fix the documentation.
Thanks!
Reply all
Reply to author
Forward
0 new messages