TL;DR: I have created a prototype for sharding of Aggregate Roots in Akka Cluster.I have been thinking about what we need to have a good support for akka-persistence in akka-cluster.For background there has been several interesting threads on the mailing list that touch this topic. The most related is probably "Creating one instance of an Akka actor per entity (domain-driven-design)"
My conclusion is that we need a sharding mechanism that can allocate Aggregate Root instances to nodes in the cluster and make them transparently accessible from other nodes.
At a high level:- On each node you run a Repository actor. (Several of these are possible)
- All messages to an Aggregate Root goes through the Repository.
- Aggregate Roots are grouped into Shards, which is a user provided function from Aggregate Root Id to Shard Id. A Shard is the smallest piece that can be allocated to a Repository and also the smallest piece that is migrated when rebalancing.
- There is a ShardCoordinator, which is a central authority for allocating Shards to Repositories. It is a cluster singleton, and has persistent state of the shard locations.
- When a Repository receives a message for an Aggregate Root (Shard) that it doesn't know about it sends a request to the ShardCoordinator. All messages for that Shard are buffered until the Repository receives the location of the Shard.
- If the location of the Shard is the Repository itself the message is delivered to child actor representing the Aggregate Root, otherwise it forwards the message to the responsible Repository. The Repository caches the Shard locations.
- Aggregate Root child actors are created on demand from user provided Props. An Aggregate Root actor is typically a Processor or EventSourcedProcessor. The Aggregate Root can stop itself in case it is idle to reduce memory consumption.
- Ordinary death watch is used between the Repositories and ShardCoordinator.- The ShardCoordinator rebalances the shards, e.g. when new nodes join the cluster. This is the most difficult part. E.g. we must avoid message re-ordering when a Shard is migrated from one Repository to another.
It is important to highlight that in this solution I don't try to solve:1. Distributed journal. That should be handled by a journal implementation backed by a distributed data/event store.2. Reliable messaging between client and Aggregate Root. That should be solved by channels.
3. Better failure detection or management of network partitions than what is provided by Akka Cluster.
Hi Patrik,I have some feedback. I'm afraid it's mainly questions to help me understand better the solution!See below...Andrew
On Tuesday, 19 November 2013 06:56:28 UTC-8, patriknw wrote:TL;DR: I have created a prototype for sharding of Aggregate Roots in Akka Cluster.I have been thinking about what we need to have a good support for akka-persistence in akka-cluster.
For background there has been several interesting threads on the mailing list that touch this topic. The most related is probably "Creating one instance of an Akka actor per entity (domain-driven-design)"
My conclusion is that we need a sharding mechanism that can allocate Aggregate Root instances to nodes in the cluster and make them transparently accessible from other nodes.
At a high level:- On each node you run a Repository actor. (Several of these are possible)
So, you run a Repository actor, for every aggregate root type, on every node?
- All messages to an Aggregate Root goes through the Repository.So, a Repository is somewhat analogous to what we'd previously discussed with regard to a parent coordinator? We'd previously talked about this being a cluster singleton but obviously weren't satisfied with that approach (hence your new solution!).
- Aggregate Roots are grouped into Shards, which is a user provided function from Aggregate Root Id to Shard Id. A Shard is the smallest piece that can be allocated to a Repository and also the smallest piece that is migrated when rebalancing.Aggregate root types can all have different sharding strategy?
- There is a ShardCoordinator, which is a central authority for allocating Shards to Repositories. It is a cluster singleton, and has persistent state of the shard locations.There is only one ShardCoordinator for all aggregate root types?
- When a Repository receives a message for an Aggregate Root (Shard) that it doesn't know about it sends a request to the ShardCoordinator. All messages for that Shard are buffered until the Repository receives the location of the Shard.- If the location of the Shard is the Repository itself the message is delivered to child actor representing the Aggregate Root, otherwise it forwards the message to the responsible Repository. The Repository caches the Shard locations.This is probably an ill-informed question, but how does a client choose which Repository to talk to? Obviously it can talk to any one (as messages will be forward if necessary) but, still, is there some defacto way of deciding which node to hit?
- Aggregate Root child actors are created on demand from user provided Props. An Aggregate Root actor is typically a Processor or EventSourcedProcessor. The Aggregate Root can stop itself in case it is idle to reduce memory consumption.It's not clear to me where an aggregate times itself out? Certainly nothing that resembles this discussion: https://groups.google.com/d/msg/akka-user/OH_38apIzAs/oonugFy2zKQJ. I can see use of ReceiveTimeout in RebalanceWorker but I'm not able to understand how this is related to aggregate root timeout?
- Ordinary death watch is used between the Repositories and ShardCoordinator.- The ShardCoordinator rebalances the shards, e.g. when new nodes join the cluster. This is the most difficult part. E.g. we must avoid message re-ordering when a Shard is migrated from one Repository to another.What happens to new messages that arrive during rebalancing? It looks like PoisonPill is used which should allow for existing messages to be consumed before termination. What happens, though, to messages that arrive after the PoisonPill whilst rebalancing is taking place? I've most likely missed something in the code...
It is important to highlight that in this solution I don't try to solve:1. Distributed journal. That should be handled by a journal implementation backed by a distributed data/event store.2. Reliable messaging between client and Aggregate Root. That should be solved by channels.So, if a node goes down, I assume any messages within an aggregate root's mailbox would be lost (unless doing something with channels)?
3. Better failure detection or management of network partitions than what is provided by Akka Cluster.Just so I understand, are you implying that, in theory, you could end up in a situation where you have two separate clusters, each one potentially making changes to the same aggregate roots?
You find the glory details in the code: https://github.com/akka/akka/pull/1843
This is still in the prototype stage and I appreciate any kind of feedback. Try to come up with problematic corner cases.Looking forward to the discussions on this.
--
You received this message because you are subscribed to the Google Groups "Akka Developer List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-dev+u...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
3. Better failure detection or management of network partitions than what is provided by Akka Cluster.
Just so I understand, are you implying that, in theory, you could end up in a situation where you have two separate clusters, each one potentially making changes to the same aggregate roots?
If you use auto-down it will form two separate clusters, which will result in two coordinators and therefore possibly two active aggregate roots with the same id. Instead you should use the manual downing or more carefully crafted automatic downing of unreachable nodes.
It would be nice if the distributed journal could detect and prevent more than one active writable processor per processor id, but that is probably difficult.
--
Thank you for insightful questions./Patrik--
You find the glory details in the code: https://github.com/akka/akka/pull/1843
This is still in the prototype stage and I appreciate any kind of feedback. Try to come up with problematic corner cases.
Looking forward to the discussions on this.
You received this message because you are subscribed to the Google Groups "Akka Developer List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-dev+u...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
--
Patrik Nordwall
Typesafe - Reactive apps on the JVM
Twitter: @patriknw
You received this message because you are subscribed to the Google Groups "Akka Developer List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-dev+u...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
-- Martin Krasser blog: http://krasserm.blogspot.com code: http://github.com/krasserm twitter: http://twitter.com/mrt1nz
- Aggregate Root child actors are created on demand from user provided Props. An Aggregate Root actor is typically a Processor or EventSourcedProcessor. The Aggregate Root can stop itself in case it is idle to reduce memory consumption.
- The ShardCoordinator rebalances the shards, e.g. when new nodes join the cluster. This is the most difficult part. E.g. we must avoid message re-ordering when a Shard is migrated from one Repository to another.
That's a very long-run concern, of course (certainly years off), but the point is that proper balancing may sometimes involve the semantics of the objects being balanced. It's not trivial to do well, and we should keep in mind that we may eventually need to add some traits that do hinting for the balancing.
Looks just about right. It's notably similar to the current version of my Querki architecture, which is basically a special case of this problem.
You don't say it explicitly, so assumption check: the ShardCoordinator is using Akka Persistence for rollover? In thinking through this problem, rolling over the central coordinator has always been the trickiest issue, but it seems a generally good fit for akka-persistence.
On Tuesday, November 19, 2013 9:56:28 AM UTC-5, patriknw wrote:- Aggregate Root child actors are created on demand from user provided Props. An Aggregate Root actor is typically a Processor or EventSourcedProcessor. The Aggregate Root can stop itself in case it is idle to reduce memory consumption.
One request: while the above is fine, let's avoid assuming that the "aggregate roots" are Processors. I'm still not sure whether that's going to be the right approach for Querki in the medium term. (My Spaces aren't *terribly* far from Processors, but I suspect it's going to be a while before the APIs are just right for that approach.)
- The ShardCoordinator rebalances the shards, e.g. when new nodes join the cluster. This is the most difficult part. E.g. we must avoid message re-ordering when a Shard is migrated from one Repository to another.
Just a datapoint: for my use case, I don't care an awful lot about aggressive rebalancing. In a system like Querki -- with lots of medium-lived objects -- it can suffice to simply focus on "eventual balancing" as the objects come up and down.(I'm sure that some use cases will care more, but given your comment about this being the hard part, keep in mind that it's not always essential.)
Truth to tell, migration actually worries me slightly. Consider a possible large-scale cluster (which I might want in the long run), where the nodes are geographically dispersed. In this environment, I will generally want the hosting node for a Space to be geographically proximate to its owner. (Who is expected to be the principal user.)That's a very long-run concern, of course (certainly years off), but the point is that proper balancing may sometimes involve the semantics of the objects being balanced. It's not trivial to do well, and we should keep in mind that we may eventually need to add some traits that do hinting for the balancing.(My initial assumptions have been to simply put the object where the first request came in, which will *usually* result in appropriate geographic proximity. I'm slightly concerned about rebalancing upsetting that, but it's mostly an academic worry for now.)
--
You received this message because you are subscribed to the Google Groups "Akka Developer List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-dev+u...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
I picked concrete naming from DDD, but this can really be used for other things than persistent aggregate roots. We could consider using more general naming, but that might also make it more difficult to understand. Naming is always hard.
Skipping rebalancing would mean that new nodes will not be used until new shards are requested.
The rebalancing should at be somewhat configurable. In this first attempt I only trigger rebalance between repository with most shards to repository with least shards if the difference in number of shards is greater than a specified threshold.
On Wed, Nov 20, 2013 at 9:45 AM, Patrik Nordwall <patrik....@gmail.com> wrote:
I picked concrete naming from DDD, but this can really be used for other things than persistent aggregate roots. We could consider using more general naming, but that might also make it more difficult to understand. Naming is always hard.Yeah, I figured. The "aggregate roots" terminology is kind of a non-sequiteur for me (since my Spaces are explicitly *not* "domain objects" in the usual sense), but I don't have a better suggestion yet, so I'm not worrying about it a lot. (I do think "shard" makes a lot of sense, the way you're using it.)Actually, that reminds me to make another assumption check: there is nothing preventing me from having a direct 1-to-1 relationship between my final Actors and the "shards", correct? I suspect that Shard == Space is correct for my needs. (Spaces then control their own hive of sub-Actors, but the Space is really the interesting bit.)
Skipping rebalancing would mean that new nodes will not be used until new shards are requested.Just so. It's not terribly necessary for my case, precisely because I expect Spaces to time out, and new ones to be requested, quite frequently. (What I meant by "medium-lived": I expect the average lifespan to be ~15 minutes.) But I'm sure that many DDD applications will be a lot less dynamic than that.
The rebalancing should at be somewhat configurable. In this first attempt I only trigger rebalance between repository with most shards to repository with least shards if the difference in number of shards is greater than a specified threshold.Fair enough. So long as I can, worse comes to worst, turn rebalancing off via config, it shouldn't cause me any problems.
And in the medium term, I can think about how I would want to enhance balancing to fit my requirements well.Anyway -- very nice, very timely tool! Not sure whether I will pick it up immediately (as previously mentioned, I'm not likely to upgrade to 2.3 until Play is released for it), but knowing that it's coming gives me directional guidance for my designs...
--
You received this message because you are subscribed to the Google Groups "Akka Developer List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-dev+u...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
HI Andrew,
It is a good question. You are thinking about running repositories only on some backend nodes and make them accessible from frontend nodes. For that I was thinking (there is a FIXME) of creating a RepositoryProxy, which only keeps track of shard locations and forwards in similar way to a real repository, but it doesn't host any aggregate roots.
TL;DR: I have created a prototype for sharding of Aggregate Roots in Akka Cluster.
I have been thinking about what we need to have a good support for akka-persistence in akka-cluster.For background there has been several interesting threads on the mailing list that touch this topic. The most related is probably "Creating one instance of an Akka actor per entity (domain-driven-design)"
My conclusion is that we need a sharding mechanism that can allocate Aggregate Root instances to nodes in the cluster and make them transparently accessible from other nodes.
At a high level:- On each node you run a Repository actor. (Several of these are possible)
- All messages to an Aggregate Root goes through the Repository.
- Aggregate Roots are grouped into Shards, which is a user provided function from Aggregate Root Id to Shard Id. A Shard is the smallest piece that can be allocated to a Repository and also the smallest piece that is migrated when rebalancing.
- There is a ShardCoordinator, which is a central authority for allocating Shards to Repositories. It is a cluster singleton, and has persistent state of the shard locations.
- When a Repository receives a message for an Aggregate Root (Shard) that it doesn't know about it sends a request to the ShardCoordinator. All messages for that Shard are buffered until the Repository receives the location of the Shard.
- If the location of the Shard is the Repository itself the message is delivered to child actor representing the Aggregate Root, otherwise it forwards the message to the responsible Repository. The Repository caches the Shard locations.
- Aggregate Root child actors are created on demand from user provided Props. An Aggregate Root actor is typically a Processor or EventSourcedProcessor. The Aggregate Root can stop itself in case it is idle to reduce memory consumption.
- Ordinary death watch is used between the Repositories and ShardCoordinator.- The ShardCoordinator rebalances the shards, e.g. when new nodes join the cluster. This is the most difficult part. E.g. we must avoid message re-ordering when a Shard is migrated from one Repository to another.
It is important to highlight that in this solution I don't try to solve:1. Distributed journal. That should be handled by a journal implementation backed by a distributed data/event store.2. Reliable messaging between client and Aggregate Root. That should be solved by channels.3. Better failure detection or management of network partitions than what is provided by Akka Cluster.
Hi Patrik,
Below are some observations. I am replying late in the discussion, so several of my comments are based on what has already been kicked around.
Vaughn
On Tuesday, November 19, 2013 7:56:28 AM UTC-7, patriknw wrote:TL;DR: I have created a prototype for sharding of Aggregate Roots in Akka Cluster.
No, I think it is well stated.
I have been thinking about what we need to have a good support for akka-persistence in akka-cluster.For background there has been several interesting threads on the mailing list that touch this topic. The most related is probably "Creating one instance of an Akka actor per entity (domain-driven-design)"
My conclusion is that we need a sharding mechanism that can allocate Aggregate Root instances to nodes in the cluster and make them transparently accessible from other nodes.
At a high level:- On each node you run a Repository actor. (Several of these are possible)
I really dislike this naming, especially when used in a DDD setting. I think that Justin (others?) makes some really good points about naming, and I believe that attention to naming should be more sensitive; very well thought out.
Further below I provide some perspectives that you may want to consider, but I won't be able to resist some forward references. The main reason I say that Repository is inappropriate, even in a DDD setting, is because a Repository should never or almost never be responsible for business logic. The "almost never" is because sometimes you have to get the database, which is backing the Repository, to calculate something for you because doing so in the VM will not perform well.
- All messages to an Aggregate Root goes through the Repository.
Okay, but again the naming is weak. All messages to X goes through X-Router. Maybe X is Entry (or possibly Entity) and maybe X-Router is Cache or Region.
- Aggregate Roots are grouped into Shards, which is a user provided function from Aggregate Root Id to Shard Id. A Shard is the smallest piece that can be allocated to a Repository and also the smallest piece that is migrated when rebalancing.
Good. This may be old news to some, but I thought it was an interesting finding on my part. If the Aggregate unique Id is a UUID, it is really easy to create a consistent hash like so:
private void distributeKeysToPartitions(List<Set<String>> aPartitions, int aTotalKeys) {
long numberOfPartitions = aPartitions.size();
for (int i = 1; i < aTotalKeys + 1; ++i) {
String key = UUID.randomUUID().toString(); // String.format("%08d", i);
long keyHash = key.hashCode();
Random random = new Random();
int actorTypeIndex = random.nextInt(actorTypes.length);
long actorTypeHash = actorTypes[actorTypeIndex].hashCode();
long partition = (keyHash + actorTypeHash) % numberOfPartitions;
if (partition < 0) {
partition = -partition;
}
aPartitions.get((int) partition).add(key);
if (((i + 1) % 100000) == 0) {
System.out.print(".");
}
if (((i + 1) % 1000000) == 0) {
System.out.println();
}
}
}
Believe it or not, across any number of partitions (choose 4 or choose 10,000), and anywhere into the millions of UUID aggregate identities, the keys will hash to almost equal buckets. What I mean by almost equal is that with even 10,000 nodes and 1MM ids, you will see partitions/nodes with spread within a few hundred to at most a few thousand difference.
That said, this is just one way to produce a random shard/partition/node location. I think that there is high value in considering the possibility, however, that calculating shards isn't always the best idea. Sometimes you just want to name the Cache/Region that you want to be part of. (Again, see further below.)
- There is a ShardCoordinator, which is a central authority for allocating Shards to Repositories. It is a cluster singleton, and has persistent state of the shard locations.
Where is it located?
- When a Repository receives a message for an Aggregate Root (Shard) that it doesn't know about it sends a request to the ShardCoordinator. All messages for that Shard are buffered until the Repository receives the location of the Shard.
- Is this using a Stash? If so, isn't that an unreliable temporary queue?
- If the location of the Shard is the Repository itself the message is delivered to child actor representing the Aggregate Root, otherwise it forwards the message to the responsible Repository. The Repository caches the Shard locations.
- Not sure I follow this.
- Aggregate Root child actors are created on demand from user provided Props. An Aggregate Root actor is typically a Processor or EventSourcedProcessor. The Aggregate Root can stop itself in case it is idle to reduce memory consumption.
- This is why, in my comments below, I think that this effort isn't really a Cache. It is more of a specialized Grid/Fabric.
- Ordinary death watch is used between the Repositories and ShardCoordinator.- The ShardCoordinator rebalances the shards, e.g. when new nodes join the cluster. This is the most difficult part. E.g. we must avoid message re-ordering when a Shard is migrated from one Repository to another.
- Consider evicting Actors from the Shard/Cache/Region when a node is added. Eviction is based on a new hash calculation. When the next message is sent to one of the evicted Actors, the Actors is brought to life again but now on the new node.
It is important to highlight that in this solution I don't try to solve:1. Distributed journal. That should be handled by a journal implementation backed by a distributed data/event store.2. Reliable messaging between client and Aggregate Root. That should be solved by channels.3. Better failure detection or management of network partitions than what is provided by Akka Cluster.
- Here are my general comments. I think that what is being developed here is not just for DDD. It is more a Grid.
It may not be so much a set of Caches because an Actor is transient. So maybe using the follow terms are more appropriate:
-- Grid or Fabric defines the overall concept. I think that Grid is probably sufficient.
-- Each Aggregate is associated with an "area" in the Grid. You could call this a Shard, but maybe it is more like a Region.
It's a Cache, but not a Cache in the sense that Coherence uses Cache. So maybe Region is more versatile.
-- Each Aggregate in a Region (Region is at least a placeholder) is represented as an Entry in the Region. Entries are referenced by clients by an EntryRef.
-- If any of this seems acceptable, then you have a Region and RegionCoordinator, for example. Regions can be specifically named or can be named via an algorithm.
- I think that all of these are a better fit for Justin's concerns. That makes this approach much more applicable to all kinds of uses, not just domain models.
You find the glory details in the code: https://github.com/akka/akka/pull/1843This is still in the prototype stage and I appreciate any kind of feedback. Try to come up with problematic corner cases.Looking forward to the discussions on this.
--
You received this message because you are subscribed to the Google Groups "Akka Developer List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-dev+u...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
I have updated the pull request based on your feedback.I renamed Aggregate Root to Entry and Repository to ShardRegionImplemented missing things such as:- Limit buffer size- Support buffering during passivate- Pluggable shard allocation strategy (and supporting Justin's use case also)- Java API- Proxy only mode- Extension for ease of use- SharedLeveldbStore in testRemaining:- ScalaDoc and rst docs- Snapshotting in ShardCoordinatorThe primary user API for creating the actors will be the extension, which is illustrated here: https://github.com/akka/akka/blob/b042e176ac5018f468cb28efd89c07a9307dcaac/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala#L413-L451WDYT?
On Tuesday, 3 December 2013 02:20:35 UTC-8, patriknw wrote:
I have updated the pull request based on your feedback.
[...]WDYT?
Very, very cool. This is properly exciting :-)
Awesome job Patrik.
--
--
You received this message because you are subscribed to the Google Groups "Akka Developer List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-dev+u...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-dev+unsubscribe@googlegroups.com.
--
Raymond Roestenburg--
You received this message because you are subscribed to the Google Groups "Akka Developer List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-dev+u...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
--
You received this message because you are subscribed to the Google Groups "Akka Developer List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-dev+u...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
Hi Patrik,
Obviously if Roland is happy it must be great. Given that you implemented my suggestions, some of my similar work is probably redundant, and your understanding at the cluster-level is best. Still I think I may still be able to kick in a few goodies. I don't want to just code review, I would prefer to try to use what you've done and see where my further contributions might fit in.
I have not used akka-cluster before. I assume you can run a single node cluster easily enough. Can I do some work on this without getting into major involvement in build and configuration knowledge? Could you describe the environment for this? I just want to follow a straight line from here to there.
--
You received this message because you are subscribed to a topic in the Google Groups "Akka Developer List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-dev/ohdT-Et4ZoY/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-dev+u...@googlegroups.com.