CRDT gossip implementation - replication flow

334 views
Skip to first unread message

Vadim Punski

unread,
Jan 29, 2017, 10:11:38 AM1/29/17
to Akka User List
Hi,

I'm new to Akka and CRDT, so take the below with a grain of salt.
I'm trying to implement the application requirements with thousands of LWW-Maps, not related to each other.
The implementation itself is not the main issue I have, but the internal "distributed data" gossip protocol implementation, I found interesting...

I suppose the problem below is related to my understanding (or lack of understanding) of CRDT, or current implementation of distributed-data plugin...

Let's say I have 10K not related to each other CRDT based data type instances, for example 10K LWW-Maps. From the application point of view, they may be out of sync in relation to each other, but each one should be eventually consistent and synced in a cluster, using Local Consistency. By reading distributed-data plugin source code, I've found that for LWW-Map Key created (as any other CRDT data type instance defined) "lives" in a Replicator.dataEntries map.

Then, the gossip protocol, based on the tick interval, replicates once a time configured chunk size of data, but no more than 10 chunks at once(Replicator.gossipTo()?). Means, the whole dataEntries map is replicated, no matter how many LWW-Maps were changed, if at all. The replication is goes as sending the digests of all (!) LWW-Maps (entries in dataEntries, using Status message) by gossip initiating node, and then requesting the differences and sending them back, if such differences were found (Gossip, one more round trip).
That means, even with zero changes to my 10K LWW-Map entries, with few entries per each LWW-Map instance, 10K digests will be sent to all nodes each gossip tick fired. I see this behavior in a log...
This is the flow seen in 2.4.16 release version.

In 2.5.0-SNAPSHOT, delta CRDT approach is presented. Although the documentation and the article mentioned talk about deltas changes of different CRDT data types (ex.: c, d elements in a set), the replication mechanism works on key->dataEnvelope level, means on the whole changed entry of dataEntries map (deltaPropagationSelector.update(key.id, d) ...). Please notice, the replication is not performed as a sending delta changes of a single data type instance(changed entries of of particular Set for example), but as a whole Set replication. In the current implementation,  "delta" means that only changed instances will be replicated by gossip protocol, and not the actual delta of CRDT data type instance.

I spotted this behavior when having 1K different non related LWW-Maps with different key names, the system initiated sending of 1K digest signatures to all nodes without single change (in 2.4.16), or, having single map entry changed will fire the same flow anyway.

2.5.0-SNAPSHOT version should improve this approach, but, yet, the delta message is not built on a single CRDT type instance, but the all entries from dataEntires map there were changed, even if a single entry was changed in a 10 entries LWW-Map.

So based on my understanding, my questions are:
- in 2.4.16, what is the rationale to treat non related CRDT instances in dataEntries map as one composite CRDT, and replicate them as one single structure, and not as non-delta replication, but for each instance separately? If my approach is wrong, what is the way to do it anyway?
- in 2.5.0, why the delta mechanism works on dataEntries map level, but the documentation and the article talk about delta of a single CRDT data type instance (members of CRDT Set) ? Is my understanding correct? If so, what is the reason for the current implementation?
- What is the best way to implement large amount of non-related CRDT data type instances, Local Consistency, and reduce the replication overhead?


Current implementation creates huge amount of inter-node traffic and cpu overhead, and even for less than 100K entries is pretty heavy. I don't really understand how the "shopping cart" like example (in case of Local Consistency!) with simultaneously logged-in 10K users with 10K shopping carts replicated each gossip interval  (digest round trip saves a lot, but still) should work in real life. On my opinion, working with Local Consistency should improve performance in comparison with any other consistency strategy, but it looks like for large amount of CRDT data type instances, it's not the case in my performance tests ...


P.S. I also found that "Subscribe" mechanism uses a kind of "delta" approach even in 2.4.16, as it saves all changed entries in "Replicator.changed" set for later notifications. So, the delta mechanism per entries of dataEntries map level exists, but only for notifications...


Vadim Punski

Patrik Nordwall

unread,
Jan 30, 2017, 9:29:13 AM1/30/17
to akka...@googlegroups.com
On Sun, Jan 29, 2017 at 3:04 AM, Vadim Punski <vpu...@gmail.com> wrote:
Hi,

I'm new to Akka and CRDT, so take the below with a grain of salt.
I'm trying to implement the application requirements with thousands of LWW-Maps, not related to each other.
The implementation itself is not the main issue I have, but the internal "distributed data" gossip protocol implementation, I found interesting...

I suppose the problem below is related to my understanding (or lack of understanding) of CRDT, or current implementation of distributed-data plugin...

Let's say I have 10K not related to each other CRDT based data type instances, for example 10K LWW-Maps. From the application point of view, they may be out of sync in relation to each other, but each one should be eventually consistent and synced in a cluster, using Local Consistency. By reading distributed-data plugin source code, I've found that for LWW-Map Key created (as any other CRDT data type instance defined) "lives" in a Replicator.dataEntries map.

Then, the gossip protocol, based on the tick interval, replicates once a time configured chunk size of data, but no more than 10 chunks at once(Replicator.gossipTo()?). Means, the whole dataEntries map is replicated, no matter how many LWW-Maps were changed, if at all. The replication is goes as sending the digests of all (!) LWW-Maps (entries in dataEntries, using Status message) by gossip initiating node, and then requesting the differences and sending them back, if such differences were found (Gossip, one more round trip).
That means, even with zero changes to my 10K LWW-Map entries, with few entries per each LWW-Map instance, 10K digests will be sent to all nodes each gossip tick fired. I see this behavior in a log...

It's sent to one random node per tick, not all nodes. The reason for sending all digests is to be able to find entries that are not the same. Remember that message delivery is at-most-once and there is no redelivery and acking here to make it anything else. Also new nodes joining the cluster need to sync up.
 
This is the flow seen in 2.4.16 release version.

In 2.5.0-SNAPSHOT, delta CRDT approach is presented. Although the documentation and the article mentioned talk about deltas changes of different CRDT data types (ex.: c, d elements in a set), the replication mechanism works on key->dataEnvelope level, means on the whole changed entry of dataEntries map (deltaPropagationSelector.update(key.id, d) ...). Please notice, the replication is not performed as a sending delta changes of a single data type instance(changed entries of of particular Set for example), but as a whole Set replication. In the current implementation,  "delta" means that only changed instances will be replicated by gossip protocol, and not the actual delta of CRDT data type instance.

Not sure I follow. Note that delta-CRDT support is only implemented for GCounter and PNCounter so far. It's not implemented for LWW-Map yet, but that should be possible so that only the changed entries in the LWW-Map are sent as the delta.
 

I spotted this behavior when having 1K different non related LWW-Maps with different key names, the system initiated sending of 1K digest signatures to all nodes without single change (in 2.4.16), or, having single map entry changed will fire the same flow anyway.

2.5.0-SNAPSHOT version should improve this approach, but, yet, the delta message is not built on a single CRDT type instance, but the all entries from dataEntires map there were changed, even if a single entry was changed in a 10 entries LWW-Map.

So based on my understanding, my questions are:
- in 2.4.16, what is the rationale to treat non related CRDT instances in dataEntries map as one composite CRDT, and replicate them as one single structure, and not as non-delta replication, but for each instance separately? If my approach is wrong, what is the way to do it anyway?
- in 2.5.0, why the delta mechanism works on dataEntries map level, but the documentation and the article talk about delta of a single CRDT data type instance (members of CRDT Set) ? Is my understanding correct? If so, what is the reason for the current implementation?
- What is the best way to implement large amount of non-related CRDT data type instances, Local Consistency, and reduce the replication overhead?


Current implementation creates huge amount of inter-node traffic and cpu overhead, and even for less than 100K entries is pretty heavy. I don't really understand how the "shopping cart" like example (in case of Local Consistency!) with simultaneously logged-in 10K users with 10K shopping carts replicated each gossip interval  (digest round trip saves a lot, but still) should work in real life. On my opinion, working with Local Consistency should improve performance in comparison with any other consistency strategy, but it looks like for large amount of CRDT data type instances, it's not the case in my performance tests ...


How large are your Status messages (with the digests)? You can reduce the chunk size or increase the tick interval, at the cost of longer dissemination times.
 


P.S. I also found that "Subscribe" mechanism uses a kind of "delta" approach even in 2.4.16, as it saves all changed entries in "Replicator.changed" set for later notifications. So, the delta mechanism per entries of dataEntries map level exists, but only for notifications...


Vadim Punski

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--

Patrik Nordwall
Akka Tech Lead
Lightbend -  Reactive apps on the JVM
Twitter: @patriknw

Vadim Punski

unread,
Jan 31, 2017, 8:42:58 AM1/31/17
to Akka User List


On Monday, January 30, 2017 at 4:29:13 PM UTC+2, Patrik Nordwall wrote:

It's sent to one random node per tick, not all nodes.
 
Node per tick, you're right ...
 
The reason for sending all digests is to be able to find entries that are not the same. Remember that message delivery is at-most-once and there is no redelivery and acking here to make it anything else. Also new nodes joining the cluster need to sync up.
 
It's possible to get all changed entries based on local node knowledge of all key entries that were changed. The code does it for "subscribe for changes" mechanism. 
As I understand, the idempotent nature assumes that more-than-once will not break anything, even if the replication message will be sent twice.
Commutative requirement promises non sequential "merges" of states.
  
I'll happy to understand the reason for the round-trip of the whole set of all digests. I didn't find any specific "strict" requirements for the consistency of the replication flow and transport method.  
So, why just not to send instances of dataEntries map that were changed recently, without all digests round-trip and differences detection?


How large are your Status messages (with the digests)? You can reduce the chunk size or increase the tick interval, at the cost of longer dissemination times.

In our application, very low percentage of instances get updated each tick time period, but with a very high rate. The total number of CRDT type instances in the system may reach hundreds of thousands (at the moment). I'm trying to improve the behavior and eliminate "all keys/digests round trip" to "changed keys" only. This is prior to any delta improvements, that look even more promising...
I can use different consistency level like write majority to improve this, but the performance will suffer, and many unneeded replications will be fired for every "Update" ...

I would appreciate any ideas.

Vadim Punski

Patrik Nordwall

unread,
Jan 31, 2017, 9:02:51 AM1/31/17
to akka...@googlegroups.com
On Tue, Jan 31, 2017 at 6:42 AM, Vadim Punski <vpu...@gmail.com> wrote:


On Monday, January 30, 2017 at 4:29:13 PM UTC+2, Patrik Nordwall wrote:

It's sent to one random node per tick, not all nodes.
 
Node per tick, you're right ...
 
The reason for sending all digests is to be able to find entries that are not the same. Remember that message delivery is at-most-once and there is no redelivery and acking here to make it anything else. Also new nodes joining the cluster need to sync up.
 
It's possible to get all changed entries based on local node knowledge of all key entries that were changed. The code does it for "subscribe for changes" mechanism. 
As I understand, the idempotent nature assumes that more-than-once will not break anything, even if the replication message will be sent twice.
Commutative requirement promises non sequential "merges" of states.
  
I'll happy to understand the reason for the round-trip of the whole set of all digests. I didn't find any specific "strict" requirements for the consistency of the replication flow and transport method.  
So, why just not to send instances of dataEntries map that were changed recently, without all digests round-trip and differences detection?

The local knowledge is not enough. You would have to keep track of the differences on a node pair bases. E.g. Change A in node1, gossip it to node2, which in turn gossips it to node3. Now node1 doesn't know that node3 already got it.

The idea is that the Status messages should be small enough and not be sent very often (once per second is not often) and the important thing is to not send the full data values more than needed.
 


How large are your Status messages (with the digests)? You can reduce the chunk size or increase the tick interval, at the cost of longer dissemination times.

In our application, very low percentage of instances get updated each tick time period, but with a very high rate. The total number of CRDT type instances in the system may reach hundreds of thousands (at the moment).

Then you are pushing it beyond current limitations. That's why I included the documentation link in previous reply.

Especially if these top level keys are dynamic (added, removed) it is not a good approach, because there will be a tombstone marker remaining for deleted keys. Instead you should for such things use ORMap or LWWMap as the top level entries and limit that to a few thousand. Then you can add/remove the actual entries in those entries. Choice of top level Map can be done with hashing. 

 
I'm trying to improve the behavior and eliminate "all keys/digests round trip" to "changed keys" only. This is prior to any delta improvements, that look even more promising...
I can use different consistency level like write majority to improve this, but the performance will suffer, and many unneeded replications will be fired for every "Update" ...

I would appreciate any ideas.

Vadim Punski

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Vadim Punski

unread,
Jan 31, 2017, 9:26:56 AM1/31/17
to Akka User List


On Tuesday, January 31, 2017 at 4:02:51 PM UTC+2, Patrik Nordwall wrote:


On Tue, Jan 31, 2017 at 6:42 AM, Vadim Punski <vpu...@gmail.com> wrote:


On Monday, January 30, 2017 at 4:29:13 PM UTC+2, Patrik Nordwall wrote:

It's sent to one random node per tick, not all nodes.
 
Node per tick, you're right ...
 
The reason for sending all digests is to be able to find entries that are not the same. Remember that message delivery is at-most-once and there is no redelivery and acking here to make it anything else. Also new nodes joining the cluster need to sync up.
 
It's possible to get all changed entries based on local node knowledge of all key entries that were changed. The code does it for "subscribe for changes" mechanism. 
As I understand, the idempotent nature assumes that more-than-once will not break anything, even if the replication message will be sent twice.
Commutative requirement promises non sequential "merges" of states.
  
I'll happy to understand the reason for the round-trip of the whole set of all digests. I didn't find any specific "strict" requirements for the consistency of the replication flow and transport method.  
So, why just not to send instances of dataEntries map that were changed recently, without all digests round-trip and differences detection?

The local knowledge is not enough. You would have to keep track of the differences on a node pair bases. E.g. Change A in node1, gossip it to node2, which in turn gossips it to node3. Now node1 doesn't know that node3 already got it.

I think now I'm closer. Missed the duplicate wastful data replication to the node that already has the data ...

So, the mechanism of digest sending is to reduce wasteful data because of:
- sending digest prior to data to save traffic
- sending data to nodes that already received the change

By merging both ideas from all previous posts, ...  why not to send digests of changed key entries only?


The idea is that the Status messages should be small enough and not be sent very often (once per second is not often) and the important thing is to not send the full data values more than needed.
1 second ... it's a matter of requirements and data stale reads probability prevention... 
 
 


How large are your Status messages (with the digests)? You can reduce the chunk size or increase the tick interval, at the cost of longer dissemination times.

In our application, very low percentage of instances get updated each tick time period, but with a very high rate. The total number of CRDT type instances in the system may reach hundreds of thousands (at the moment).

Then you are pushing it beyond current limitations. That's why I included the documentation link in previous reply.

Especially if these top level keys are dynamic (added, removed) it is not a good approach, because there will be a tombstone marker remaining for deleted keys. Instead you should for such things use ORMap or LWWMap as the top level entries and limit that to a few thousand. Then you can add/remove the actual entries in those entries. Choice of top level Map can be done with hashing. 

Top level keys are pretty static. Currently we don't have this problem ... 
What will happen in our scenario, is that many maps will have changed entries because of normal data distribution between all of them ... so, every period of time all maps will have many small changes, that will fire "global" replication and acutal data replication, because all digests will be different ... It may be even worse than sending large amount of digests only, but very few data entries ...


 
I'm trying to improve the behavior and eliminate "all keys/digests round trip" to "changed keys" only. This is prior to any delta improvements, that look even more promising...
I can use different consistency level like write majority to improve this, but the performance will suffer, and many unneeded replications will be fired for every "Update" ...

I would appreciate any ideas.

Vadim Punski

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Patrik Nordwall

unread,
Jan 31, 2017, 1:38:00 PM1/31/17
to akka...@googlegroups.com
On Tue, Jan 31, 2017 at 7:26 AM, Vadim Punski <vpu...@gmail.com> wrote:


On Tuesday, January 31, 2017 at 4:02:51 PM UTC+2, Patrik Nordwall wrote:


On Tue, Jan 31, 2017 at 6:42 AM, Vadim Punski <vpu...@gmail.com> wrote:


On Monday, January 30, 2017 at 4:29:13 PM UTC+2, Patrik Nordwall wrote:

It's sent to one random node per tick, not all nodes.
 
Node per tick, you're right ...
 
The reason for sending all digests is to be able to find entries that are not the same. Remember that message delivery is at-most-once and there is no redelivery and acking here to make it anything else. Also new nodes joining the cluster need to sync up.
 
It's possible to get all changed entries based on local node knowledge of all key entries that were changed. The code does it for "subscribe for changes" mechanism. 
As I understand, the idempotent nature assumes that more-than-once will not break anything, even if the replication message will be sent twice.
Commutative requirement promises non sequential "merges" of states.
  
I'll happy to understand the reason for the round-trip of the whole set of all digests. I didn't find any specific "strict" requirements for the consistency of the replication flow and transport method.  
So, why just not to send instances of dataEntries map that were changed recently, without all digests round-trip and differences detection?

The local knowledge is not enough. You would have to keep track of the differences on a node pair bases. E.g. Change A in node1, gossip it to node2, which in turn gossips it to node3. Now node1 doesn't know that node3 already got it.

I think now I'm closer. Missed the duplicate wastful data replication to the node that already has the data ...

So, the mechanism of digest sending is to reduce wasteful data because of:
- sending digest prior to data to save traffic
- sending data to nodes that already received the change

By merging both ideas from all previous posts, ...  why not to send digests of changed key entries only? 


The idea is that the Status messages should be small enough and not be sent very often (once per second is not often) and the important thing is to not send the full data values more than needed.
1 second ... it's a matter of requirements and data stale reads probability prevention... 

I think that will be improved when there is delta-CRDT support for the data types you use, because that will be quick and you can keep the full state gossip interval rather long (just to cover for cases when deltas were not delivered, and syncing up new nodes).
 
 
 


How large are your Status messages (with the digests)? You can reduce the chunk size or increase the tick interval, at the cost of longer dissemination times.

In our application, very low percentage of instances get updated each tick time period, but with a very high rate. The total number of CRDT type instances in the system may reach hundreds of thousands (at the moment).

Then you are pushing it beyond current limitations. That's why I included the documentation link in previous reply.

Especially if these top level keys are dynamic (added, removed) it is not a good approach, because there will be a tombstone marker remaining for deleted keys. Instead you should for such things use ORMap or LWWMap as the top level entries and limit that to a few thousand. Then you can add/remove the actual entries in those entries. Choice of top level Map can be done with hashing. 

Top level keys are pretty static. Currently we don't have this problem ... 
What will happen in our scenario, is that many maps will have changed entries because of normal data distribution between all of them ... so, every period of time all maps will have many small changes, that will fire "global" replication and acutal data replication, because all digests will be different ... It may be even worse than sending large amount of digests only, but very few data entries ...

Same comment as above, will be better then with the delta-CRDT.
 
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Vadim Punski

unread,
Feb 2, 2017, 8:07:47 AM2/2/17
to Akka User List
What is the road map and timeline?

Patrik Nordwall

unread,
Feb 2, 2017, 8:56:51 AM2/2/17
to Akka User List
Hopefully we can have a release candidate of 2.5.0 out within a month. The more people that help out with the remaining tasks the quicker we can get it done. See issues on github milestone 2.5.0. We can also release another milestone if needed.

/Patrik
tors 2 feb. 2017 kl. 06:07 skrev Vadim Punski <vpu...@gmail.com>:
What is the road map and timeline?

--
Reply all
Reply to author
Forward
0 new messages