Cluster sharding for akka-persistence

2,197 views
Skip to first unread message

Patrik Nordwall

unread,
Nov 19, 2013, 9:56:28 AM11/19/13
to akka...@googlegroups.com
TL;DR: I have created a prototype for sharding of Aggregate Roots in Akka Cluster.

I have been thinking about what we need to have a good support for akka-persistence in akka-cluster.

For background there has been several interesting threads on the mailing list that touch this topic. The most related is probably "Creating one instance of an Akka actor per entity (domain-driven-design)"

My conclusion is that we need a sharding mechanism that can allocate Aggregate Root instances to nodes in the cluster and make them transparently accessible from other nodes.

At a high level:
- On each node you run a Repository actor. (Several of these are possible)
- All messages to an Aggregate Root goes through the Repository.
- Aggregate Roots are grouped into Shards, which is a user provided function from Aggregate Root Id to Shard Id. A Shard is the smallest piece that can be allocated to a Repository and also the smallest piece that is migrated when rebalancing.
- There is a ShardCoordinator, which is a central authority for allocating Shards to Repositories. It is a cluster singleton, and has persistent state of the shard locations.
- When a Repository receives a message for an Aggregate Root (Shard) that it doesn't know about it sends a request to the ShardCoordinator. All messages for that Shard are buffered until the Repository receives the location of the Shard.
- If the location of the Shard is the Repository itself the message is delivered to child actor  representing the Aggregate Root, otherwise it forwards the message to the responsible Repository. The Repository caches the Shard locations.
- Aggregate Root child actors are created on demand from user provided Props. An Aggregate Root actor is typically a Processor or EventSourcedProcessor. The Aggregate Root can stop itself in case it is idle to reduce memory consumption.
- Ordinary death watch is used between the Repositories and ShardCoordinator.
- The ShardCoordinator rebalances the shards, e.g. when new nodes join the cluster. This is the most difficult part. E.g. we must avoid message re-ordering when a Shard is migrated from one Repository to another. 

It is important to highlight that in this solution I don't try to solve:
1. Distributed journal. That should be handled by a journal implementation backed by a distributed data/event store.
2. Reliable messaging between client and Aggregate Root. That should be solved by channels.
3. Better failure detection or management of network partitions than what is provided by Akka Cluster.

You find the glory details in the code: https://github.com/akka/akka/pull/1843

This is still in the prototype stage and I appreciate any kind of feedback. Try to come up with problematic corner cases.

Looking forward to the discussions on this.



--

Patrik Nordwall
Typesafe Reactive apps on the JVM
Twitter: @patriknw

Andrew Easter

unread,
Nov 19, 2013, 6:46:49 PM11/19/13
to akka...@googlegroups.com
Hi Patrik,

I have some feedback. I'm afraid it's mainly questions to help me understand better the solution!

See below...

Andrew


On Tuesday, 19 November 2013 06:56:28 UTC-8, patriknw wrote:
TL;DR: I have created a prototype for sharding of Aggregate Roots in Akka Cluster.

I have been thinking about what we need to have a good support for akka-persistence in akka-cluster.

For background there has been several interesting threads on the mailing list that touch this topic. The most related is probably "Creating one instance of an Akka actor per entity (domain-driven-design)"

My conclusion is that we need a sharding mechanism that can allocate Aggregate Root instances to nodes in the cluster and make them transparently accessible from other nodes.

At a high level:
- On each node you run a Repository actor. (Several of these are possible)

So, you run a Repository actor, for every aggregate root type, on every node? 

- All messages to an Aggregate Root goes through the Repository.

So, a Repository is somewhat analogous to what we'd previously discussed with regard to a parent coordinator? We'd previously talked about this being a cluster singleton but obviously weren't satisfied with that approach (hence your new solution!).
 
- Aggregate Roots are grouped into Shards, which is a user provided function from Aggregate Root Id to Shard Id. A Shard is the smallest piece that can be allocated to a Repository and also the smallest piece that is migrated when rebalancing.

Aggregate root types can all have different sharding strategy?
 
- There is a ShardCoordinator, which is a central authority for allocating Shards to Repositories. It is a cluster singleton, and has persistent state of the shard locations.

There is only one ShardCoordinator for all aggregate root types?
 
- When a Repository receives a message for an Aggregate Root (Shard) that it doesn't know about it sends a request to the ShardCoordinator. All messages for that Shard are buffered until the Repository receives the location of the Shard.
- If the location of the Shard is the Repository itself the message is delivered to child actor  representing the Aggregate Root, otherwise it forwards the message to the responsible Repository. The Repository caches the Shard locations.

This is probably an ill-informed question, but how does a client choose which Repository to talk to? Obviously it can talk to any one (as messages will be forward if necessary) but, still, is there some defacto way of deciding which node to hit?
 
- Aggregate Root child actors are created on demand from user provided Props. An Aggregate Root actor is typically a Processor or EventSourcedProcessor. The Aggregate Root can stop itself in case it is idle to reduce memory consumption.

It's not clear to me where an aggregate times itself out? Certainly nothing that resembles this discussion: https://groups.google.com/d/msg/akka-user/OH_38apIzAs/oonugFy2zKQJ. I can see use of ReceiveTimeout in RebalanceWorker but I'm not able to understand how this is related to aggregate root timeout?
 
- Ordinary death watch is used between the Repositories and ShardCoordinator.
- The ShardCoordinator rebalances the shards, e.g. when new nodes join the cluster. This is the most difficult part. E.g. we must avoid message re-ordering when a Shard is migrated from one Repository to another. 

What happens to new messages that arrive during rebalancing? It looks like PoisonPill is used which should allow for existing messages to be consumed before termination. What happens, though, to messages that arrive after the PoisonPill whilst rebalancing is taking place? I've most likely missed something in the code...
 
It is important to highlight that in this solution I don't try to solve:
1. Distributed journal. That should be handled by a journal implementation backed by a distributed data/event store.
2. Reliable messaging between client and Aggregate Root. That should be solved by channels.

 So, if a node goes down, I assume any messages within an aggregate root's mailbox would be lost (unless doing something with channels)?
 
3. Better failure detection or management of network partitions than what is provided by Akka Cluster.

Just so I understand, are you implying that, in theory, you could end up in a situation where you have two separate clusters, each one potentially making changes to the same aggregate roots? 

Patrik Nordwall

unread,
Nov 20, 2013, 2:21:46 AM11/20/13
to akka...@googlegroups.com
HI Andrew,


On Wed, Nov 20, 2013 at 12:46 AM, Andrew Easter <andrew...@gmail.com> wrote:
Hi Patrik,

I have some feedback. I'm afraid it's mainly questions to help me understand better the solution!

See below...

Andrew


On Tuesday, 19 November 2013 06:56:28 UTC-8, patriknw wrote:
TL;DR: I have created a prototype for sharding of Aggregate Roots in Akka Cluster.

I have been thinking about what we need to have a good support for akka-persistence in akka-cluster.

For background there has been several interesting threads on the mailing list that touch this topic. The most related is probably "Creating one instance of an Akka actor per entity (domain-driven-design)"

My conclusion is that we need a sharding mechanism that can allocate Aggregate Root instances to nodes in the cluster and make them transparently accessible from other nodes.

At a high level:
- On each node you run a Repository actor. (Several of these are possible)

So, you run a Repository actor, for every aggregate root type, on every node? 

Yes. When you create the Repository actor you pass in the Props for the aggregate root actor, and that means one Repository actor per aggregate root type.

We can think about if we want that one Repository should be able to host several aggregate root types. That would require a mapping from incoming message to Props, and I don't see a strong reason for supporting this.

I will probably create an Akka Extension that makes it convenient to use.
 

- All messages to an Aggregate Root goes through the Repository.

So, a Repository is somewhat analogous to what we'd previously discussed with regard to a parent coordinator? We'd previously talked about this being a cluster singleton but obviously weren't satisfied with that approach (hence your new solution!).

Correct. The cluster singleton solution is limited in scalability to the number of aggregate root instances that can run on one node.
 

 
- Aggregate Roots are grouped into Shards, which is a user provided function from Aggregate Root Id to Shard Id. A Shard is the smallest piece that can be allocated to a Repository and also the smallest piece that is migrated when rebalancing.

Aggregate root types can all have different sharding strategy?

Yes, the sharding strategy is defined when creating the repository.
 

 
- There is a ShardCoordinator, which is a central authority for allocating Shards to Repositories. It is a cluster singleton, and has persistent state of the shard locations.

There is only one ShardCoordinator for all aggregate root types?

No, one coordinator per aggregate root type since we said that the repository is per aggregate root type. Repositories hosting the same type of aggregate roots must use the same central coordinator. When you create the repository you define the logical path of the coordinator, and the repository will register to that path on the oldest cluster member (i.e. known singleton location).
 

 
- When a Repository receives a message for an Aggregate Root (Shard) that it doesn't know about it sends a request to the ShardCoordinator. All messages for that Shard are buffered until the Repository receives the location of the Shard.
- If the location of the Shard is the Repository itself the message is delivered to child actor  representing the Aggregate Root, otherwise it forwards the message to the responsible Repository. The Repository caches the Shard locations.

This is probably an ill-informed question, but how does a client choose which Repository to talk to? Obviously it can talk to any one (as messages will be forward if necessary) but, still, is there some defacto way of deciding which node to hit?

It is a good question. You are thinking about running repositories only on some backend nodes and make them accessible from frontend nodes. For that I was thinking (there is a FIXME) of creating a RepositoryProxy, which only keeps track of shard locations and forwards in similar way to a real repository, but it doesn't host any aggregate roots.

The client should talk to the same repository instance if message ordering between client and aggregate root is important.
 

 
- Aggregate Root child actors are created on demand from user provided Props. An Aggregate Root actor is typically a Processor or EventSourcedProcessor. The Aggregate Root can stop itself in case it is idle to reduce memory consumption.

It's not clear to me where an aggregate times itself out? Certainly nothing that resembles this discussion: https://groups.google.com/d/msg/akka-user/OH_38apIzAs/oonugFy2zKQJ. I can see use of ReceiveTimeout in RebalanceWorker but I'm not able to understand how this is related to aggregate root timeout?

That should be implemented in the application specific aggregate root actor, but I plan to support the buffering of messages during the stop period as discussed somewhere.
 

 
- Ordinary death watch is used between the Repositories and ShardCoordinator.
- The ShardCoordinator rebalances the shards, e.g. when new nodes join the cluster. This is the most difficult part. E.g. we must avoid message re-ordering when a Shard is migrated from one Repository to another. 

What happens to new messages that arrive during rebalancing? It looks like PoisonPill is used which should allow for existing messages to be consumed before termination. What happens, though, to messages that arrive after the PoisonPill whilst rebalancing is taking place? I've most likely missed something in the code...

The shard is de-registered in BeginHandOff, that means that all incoming messages are buffered in the shardBuffer, and the coordinator will not answer to GetShardHome requests until the handoff is completed. That means that incoming messages will be buffered in the repository and then delivered to the final destination when the handoff is completed.
 

 
It is important to highlight that in this solution I don't try to solve:
1. Distributed journal. That should be handled by a journal implementation backed by a distributed data/event store.
2. Reliable messaging between client and Aggregate Root. That should be solved by channels.

 So, if a node goes down, I assume any messages within an aggregate root's mailbox would be lost (unless doing something with channels)?

Yes, end-to-end reliable messaging should be done with channels.
 

 
3. Better failure detection or management of network partitions than what is provided by Akka Cluster.

Just so I understand, are you implying that, in theory, you could end up in a situation where you have two separate clusters, each one potentially making changes to the same aggregate roots? 

If you use auto-down it will form two separate clusters, which will result in two coordinators and therefore possibly two active aggregate roots with the same id. Instead you should use the manual downing or more carefully crafted automatic downing of unreachable nodes.

It would be nice if the distributed journal could detect and prevent more than one active writable processor per processor id, but that is probably difficult.

Thank you for insightful questions.
/Patrik
 

You find the glory details in the code: https://github.com/akka/akka/pull/1843

This is still in the prototype stage and I appreciate any kind of feedback. Try to come up with problematic corner cases.

Looking forward to the discussions on this.



--

Patrik Nordwall
Typesafe Reactive apps on the JVM
Twitter: @patriknw

--
You received this message because you are subscribed to the Google Groups "Akka Developer List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-dev+u...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Martin Krasser

unread,
Nov 20, 2013, 6:19:22 AM11/20/13
to akka...@googlegroups.com
Channels plus a reliable delivery mechansim (can be ReliableProxy, a 3rd party message broker or whatever). See also http://doc.akka.io/docs/akka/snapshot/scala/persistence.html#message-re-delivery


 

 
3. Better failure detection or management of network partitions than what is provided by Akka Cluster.

Just so I understand, are you implying that, in theory, you could end up in a situation where you have two separate clusters, each one potentially making changes to the same aggregate roots? 

If you use auto-down it will form two separate clusters, which will result in two coordinators and therefore possibly two active aggregate roots with the same id. Instead you should use the manual downing or more carefully crafted automatic downing of unreachable nodes.

It would be nice if the distributed journal could detect and prevent more than one active writable processor per processor id, but that is probably difficult.

I think this boils down to the ability of implementing a distributed lock. Trivial to implement with Zookeeper, for example, but maybe not something we should require in general from distributed storage backends.



Thank you for insightful questions.
/Patrik
 

You find the glory details in the code: https://github.com/akka/akka/pull/1843

This is still in the prototype stage and I appreciate any kind of feedback. Try to come up with problematic corner cases.

Looking forward to the discussions on this.



--

Patrik Nordwall
Typesafe Reactive apps on the JVM
Twitter: @patriknw

--
You received this message because you are subscribed to the Google Groups "Akka Developer List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-dev+u...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.



--

Patrik Nordwall
Typesafe Reactive apps on the JVM
Twitter: @patriknw

--
You received this message because you are subscribed to the Google Groups "Akka Developer List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-dev+u...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Justin du coeur

unread,
Nov 20, 2013, 9:12:03 AM11/20/13
to akka...@googlegroups.com
Looks just about right.  It's notably similar to the current version of my Querki architecture, which is basically a special case of this problem.

You don't say it explicitly, so assumption check: the ShardCoordinator is using Akka Persistence for rollover?  In thinking through this problem, rolling over the central coordinator has always been the trickiest issue, but it seems a generally good fit for akka-persistence.

On Tuesday, November 19, 2013 9:56:28 AM UTC-5, patriknw wrote:
- Aggregate Root child actors are created on demand from user provided Props. An Aggregate Root actor is typically a Processor or EventSourcedProcessor. The Aggregate Root can stop itself in case it is idle to reduce memory consumption.

One request: while the above is fine, let's avoid assuming that the "aggregate roots" are Processors.  I'm still not sure whether that's going to be the right approach for Querki in the medium term.  (My Spaces aren't *terribly* far from Processors, but I suspect it's going to be a while before the APIs are just right for that approach.)
 
- The ShardCoordinator rebalances the shards, e.g. when new nodes join the cluster. This is the most difficult part. E.g. we must avoid message re-ordering when a Shard is migrated from one Repository to another. 

Just a datapoint: for my use case, I don't care an awful lot about aggressive rebalancing.  In a system like Querki -- with lots of medium-lived objects -- it can suffice to simply focus on "eventual balancing" as the objects come up and down.

(I'm sure that some use cases will care more, but given your comment about this being the hard part, keep in mind that it's not always essential.)

Truth to tell, migration actually worries me slightly.  Consider a possible large-scale cluster (which I might want in the long run), where the nodes are geographically dispersed.  In this environment, I will generally want the hosting node for a Space to be geographically proximate to its owner.  (Who is expected to be the principal user.)

That's a very long-run concern, of course (certainly years off), but the point is that proper balancing may sometimes involve the semantics of the objects being balanced.  It's not trivial to do well, and we should keep in mind that we may eventually need to add some traits that do hinting for the balancing.

(My initial assumptions have been to simply put the object where the first request came in, which will *usually* result in appropriate geographic proximity.  I'm slightly concerned about rebalancing upsetting that, but it's mostly an academic worry for now.)

Justin du coeur

unread,
Nov 20, 2013, 9:31:03 AM11/20/13
to akka...@googlegroups.com
Oh, right -- I knew I'd forgotten something:


On Wednesday, November 20, 2013 9:12:03 AM UTC-5, Justin du coeur wrote:
That's a very long-run concern, of course (certainly years off), but the point is that proper balancing may sometimes involve the semantics of the objects being balanced.  It's not trivial to do well, and we should keep in mind that we may eventually need to add some traits that do hinting for the balancing.

Just to underscore this: in my use case, Spaces (the target objects) are likely to vary in both size and traffic by a factor of a thousand.  Some will be high-traffic, some low; some will have a lot of content in memory, some much less.

For the time being, I'm mostly counting on the law of large numbers to usually keep things sane (I specifically don't expect any single Space to be *really* big for the moment), but if we are *serious* about balancing well, this implies that each one should expose some sort of hint about its "weight", and that weight has multiple dimensions.  Conceptually, it's not far off from the node metrics stuff, and that's complex...

Patrik Nordwall

unread,
Nov 20, 2013, 9:45:18 AM11/20/13
to akka...@googlegroups.com
Hi Justin,


On Wed, Nov 20, 2013 at 3:12 PM, Justin du coeur <jduc...@gmail.com> wrote:
Looks just about right.  It's notably similar to the current version of my Querki architecture, which is basically a special case of this problem.

Thanks for the pointer.
 

You don't say it explicitly, so assumption check: the ShardCoordinator is using Akka Persistence for rollover?  In thinking through this problem, rolling over the central coordinator has always been the trickiest issue, but it seems a generally good fit for akka-persistence.

Yes, it uses akka-persistence. I implied that with "has persistent state"
 

On Tuesday, November 19, 2013 9:56:28 AM UTC-5, patriknw wrote:
- Aggregate Root child actors are created on demand from user provided Props. An Aggregate Root actor is typically a Processor or EventSourcedProcessor. The Aggregate Root can stop itself in case it is idle to reduce memory consumption.

One request: while the above is fine, let's avoid assuming that the "aggregate roots" are Processors.  I'm still not sure whether that's going to be the right approach for Querki in the medium term.  (My Spaces aren't *terribly* far from Processors, but I suspect it's going to be a while before the APIs are just right for that approach.)

I agree. There is nothing in the code that assumes that the aggregate roots are processors.
I picked concrete naming from DDD, but this can really be used for other things than persistent aggregate roots. We could consider using more general naming, but that might also make it more difficult to understand. Naming is always hard.
 
 
- The ShardCoordinator rebalances the shards, e.g. when new nodes join the cluster. This is the most difficult part. E.g. we must avoid message re-ordering when a Shard is migrated from one Repository to another. 

Just a datapoint: for my use case, I don't care an awful lot about aggressive rebalancing.  In a system like Querki -- with lots of medium-lived objects -- it can suffice to simply focus on "eventual balancing" as the objects come up and down.

(I'm sure that some use cases will care more, but given your comment about this being the hard part, keep in mind that it's not always essential.)


Skipping rebalancing would mean that new nodes will not be used until new shards are requested. The rebalancing should at be somewhat configurable. In this first attempt I only trigger rebalance between repository with most shards to repository with least shards if the difference in number of shards is greater than a specified threshold.
 
Truth to tell, migration actually worries me slightly.  Consider a possible large-scale cluster (which I might want in the long run), where the nodes are geographically dispersed.  In this environment, I will generally want the hosting node for a Space to be geographically proximate to its owner.  (Who is expected to be the principal user.)

That's a very long-run concern, of course (certainly years off), but the point is that proper balancing may sometimes involve the semantics of the objects being balanced.  It's not trivial to do well, and we should keep in mind that we may eventually need to add some traits that do hinting for the balancing.

(My initial assumptions have been to simply put the object where the first request came in, which will *usually* result in appropriate geographic proximity.  I'm slightly concerned about rebalancing upsetting that, but it's mostly an academic worry for now.)

Yes, there are a lots of interesting variations for shard allocation. In this first attempt I simply pick the repository with least number of shards.

Thank you for feedback.

/Patrik
 

--
You received this message because you are subscribed to the Google Groups "Akka Developer List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-dev+u...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Justin du coeur

unread,
Nov 20, 2013, 11:35:26 AM11/20/13
to akka...@googlegroups.com
On Wed, Nov 20, 2013 at 9:45 AM, Patrik Nordwall <patrik....@gmail.com> wrote:
I picked concrete naming from DDD, but this can really be used for other things than persistent aggregate roots. We could consider using more general naming, but that might also make it more difficult to understand. Naming is always hard.

Yeah, I figured.  The "aggregate roots" terminology is kind of a non-sequiteur for me (since my Spaces are explicitly *not* "domain objects" in the usual sense), but I don't have a better suggestion yet, so I'm not worrying about it a lot.  (I do think "shard" makes a lot of sense, the way you're using it.)

Actually, that reminds me to make another assumption check: there is nothing preventing me from having a direct 1-to-1 relationship between my final Actors and the "shards", correct?  I suspect that Shard == Space is correct for my needs.  (Spaces then control their own hive of sub-Actors, but the Space is really the interesting bit.)

Skipping rebalancing would mean that new nodes will not be used until new shards are requested.

Just so.  It's not terribly necessary for my case, precisely because I expect Spaces to time out, and new ones to be requested, quite frequently.  (What I meant by "medium-lived": I expect the average lifespan to be ~15 minutes.)  But I'm sure that many DDD applications will be a lot less dynamic than that.
 
The rebalancing should at be somewhat configurable. In this first attempt I only trigger rebalance between repository with most shards to repository with least shards if the difference in number of shards is greater than a specified threshold.

Fair enough.  So long as I can, worse comes to worst, turn rebalancing off via config, it shouldn't cause me any problems.  And in the medium term, I can think about how I would want to enhance balancing to fit my requirements well.


Anyway -- very nice, very timely tool!  Not sure whether I will pick it up immediately (as previously mentioned, I'm not likely to upgrade to 2.3 until Play is released for it), but knowing that it's coming gives me directional guidance for my designs...

Patrik Nordwall

unread,
Nov 20, 2013, 1:12:53 PM11/20/13
to akka...@googlegroups.com
On Wed, Nov 20, 2013 at 5:35 PM, Justin du coeur <jduc...@gmail.com> wrote:
On Wed, Nov 20, 2013 at 9:45 AM, Patrik Nordwall <patrik....@gmail.com> wrote:
I picked concrete naming from DDD, but this can really be used for other things than persistent aggregate roots. We could consider using more general naming, but that might also make it more difficult to understand. Naming is always hard.

Yeah, I figured.  The "aggregate roots" terminology is kind of a non-sequiteur for me (since my Spaces are explicitly *not* "domain objects" in the usual sense), but I don't have a better suggestion yet, so I'm not worrying about it a lot.  (I do think "shard" makes a lot of sense, the way you're using it.)

Actually, that reminds me to make another assumption check: there is nothing preventing me from having a direct 1-to-1 relationship between my final Actors and the "shards", correct?  I suspect that Shard == Space is correct for my needs.  (Spaces then control their own hive of sub-Actors, but the Space is really the interesting bit.)

You can use a one-to-one mapping between shard and aggregate root by using the aggregate root id as shard id.
 

Skipping rebalancing would mean that new nodes will not be used until new shards are requested.

Just so.  It's not terribly necessary for my case, precisely because I expect Spaces to time out, and new ones to be requested, quite frequently.  (What I meant by "medium-lived": I expect the average lifespan to be ~15 minutes.)  But I'm sure that many DDD applications will be a lot less dynamic than that.

I understand. 
 
 
The rebalancing should at be somewhat configurable. In this first attempt I only trigger rebalance between repository with most shards to repository with least shards if the difference in number of shards is greater than a specified threshold.

Fair enough.  So long as I can, worse comes to worst, turn rebalancing off via config, it shouldn't cause me any problems.

Sure, setting the threshold to a high value would effectively turn off rebalancing.
I'm not sure that this threshold is the final solution though. Input is very valuable.
 
 And in the medium term, I can think about how I would want to enhance balancing to fit my requirements well.


Anyway -- very nice, very timely tool!  Not sure whether I will pick it up immediately (as previously mentioned, I'm not likely to upgrade to 2.3 until Play is released for it), but knowing that it's coming gives me directional guidance for my designs...

excellent

/Patrik
 

--
You received this message because you are subscribed to the Google Groups "Akka Developer List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-dev+u...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Andrew Easter

unread,
Nov 20, 2013, 1:20:22 PM11/20/13
to akka...@googlegroups.com
On Tuesday, 19 November 2013 23:21:46 UTC-8, patriknw wrote:
HI Andrew,


It is a good question. You are thinking about running repositories only on some backend nodes and make them accessible from frontend nodes. For that I was thinking (there is a FIXME) of creating a RepositoryProxy, which only keeps track of shard locations and forwards in similar way to a real repository, but it doesn't host any aggregate roots.

I'd actually been thinking about this before I read your reply. I see it kind of analogous to read-only node clients in Elasticsearch: http://www.elasticsearch.org/guide/en/elasticsearch/client/java-api/current/client.html#node-client.

Andrew

Vaughn Vernon

unread,
Nov 20, 2013, 10:00:30 PM11/20/13
to akka...@googlegroups.com
Hi Patrik,

Below are some observations. I am replying late in the discussion, so several of my comments are based on what has already been kicked around.

Vaughn



On Tuesday, November 19, 2013 7:56:28 AM UTC-7, patriknw wrote:
TL;DR: I have created a prototype for sharding of Aggregate Roots in Akka Cluster.

No, I think it is well stated.
 

I have been thinking about what we need to have a good support for akka-persistence in akka-cluster.

For background there has been several interesting threads on the mailing list that touch this topic. The most related is probably "Creating one instance of an Akka actor per entity (domain-driven-design)"

My conclusion is that we need a sharding mechanism that can allocate Aggregate Root instances to nodes in the cluster and make them transparently accessible from other nodes.

At a high level:
- On each node you run a Repository actor. (Several of these are possible)

I really dislike this naming, especially when used in a DDD setting. I think that Justin (others?) makes some really good points about naming, and I believe that attention to naming should be more sensitive; very well thought out. Further below I provide some perspectives that you may want to consider, but I won't be able to resist some forward references. The main reason I say that Repository is inappropriate, even in a DDD setting, is because a Repository should never or almost never be responsible for business logic. The "almost never" is because sometimes you have to get the database, which is backing the Repository, to calculate something for you because doing so in the VM will not perform well.
 
- All messages to an Aggregate Root goes through the Repository.

Okay, but again the naming is weak. All messages to X goes through X-Router. Maybe X is Entry (or possibly Entity) and maybe X-Router is Cache or Region.

 
- Aggregate Roots are grouped into Shards, which is a user provided function from Aggregate Root Id to Shard Id. A Shard is the smallest piece that can be allocated to a Repository and also the smallest piece that is migrated when rebalancing.

Good. This may be old news to some, but I thought it was an interesting finding on my part. If the Aggregate unique Id is a UUID, it is really easy to create a consistent hash like so:

    private void distributeKeysToPartitions(List<Set<String>> aPartitions, int aTotalKeys) {
        long numberOfPartitions = aPartitions.size();

        for (int i = 1; i < aTotalKeys + 1; ++i) {
            String key = UUID.randomUUID().toString(); // String.format("%08d", i);

            long keyHash = key.hashCode();
            Random random = new Random();
            int actorTypeIndex = random.nextInt(actorTypes.length);
            long actorTypeHash = actorTypes[actorTypeIndex].hashCode();

            long partition = (keyHash + actorTypeHash) % numberOfPartitions;

            if (partition < 0) {
                partition = -partition;
            }

            aPartitions.get((int) partition).add(key);

            if (((i + 1) % 100000) == 0) {
                System.out.print(".");
            }
            if (((i + 1) % 1000000) == 0) {
                System.out.println();
            }
        }
    }

Believe it or not, across any number of partitions (choose 4 or choose 10,000), and anywhere into the millions of UUID aggregate identities, the keys will hash to almost equal buckets. What I mean by almost equal is that with even 10,000 nodes and 1MM ids, you will see partitions/nodes with spread within a few hundred to at most a few thousand difference.

That said, this is just one way to produce a random shard/partition/node location. I think that there is high value in considering the possibility, however, that calculating shards isn't always the best idea. Sometimes you just want to name the Cache/Region that you want to be part of. (Again, see further below.)

 
- There is a ShardCoordinator, which is a central authority for allocating Shards to Repositories. It is a cluster singleton, and has persistent state of the shard locations.

Where is it located?
 
- When a Repository receives a message for an Aggregate Root (Shard) that it doesn't know about it sends a request to the ShardCoordinator. All messages for that Shard are buffered until the Repository receives the location of the Shard.

- Is this using a Stash? If so, isn't that an unreliable temporary queue?
 
- If the location of the Shard is the Repository itself the message is delivered to child actor  representing the Aggregate Root, otherwise it forwards the message to the responsible Repository. The Repository caches the Shard locations.

- Not sure I follow this.

 
- Aggregate Root child actors are created on demand from user provided Props. An Aggregate Root actor is typically a Processor or EventSourcedProcessor. The Aggregate Root can stop itself in case it is idle to reduce memory consumption.

- This is why, in my comments below, I think that this effort isn't really a Cache. It is more of a specialized Grid/Fabric.
 
- Ordinary death watch is used between the Repositories and ShardCoordinator.
- The ShardCoordinator rebalances the shards, e.g. when new nodes join the cluster. This is the most difficult part. E.g. we must avoid message re-ordering when a Shard is migrated from one Repository to another. 

- Consider evicting Actors from the Shard/Cache/Region when a node is added. Eviction is based on a new hash calculation. When the next message is sent to one of the evicted Actors, the Actors is brought to life again but now on the new node.
 

It is important to highlight that in this solution I don't try to solve:
1. Distributed journal. That should be handled by a journal implementation backed by a distributed data/event store.
2. Reliable messaging between client and Aggregate Root. That should be solved by channels.
3. Better failure detection or management of network partitions than what is provided by Akka Cluster.

- Here are my general comments. I think that what is being developed here is not just for DDD. It is more a Grid. It may not be so much a set of Caches because an Actor is transient. So maybe using the follow terms are more appropriate:

  -- Grid or Fabric defines the overall concept. I think that Grid is probably sufficient.
  -- Each Aggregate is associated with an "area" in the Grid. You could call this a Shard, but maybe it is more like a Region.
     It's a Cache, but not a Cache in the sense that Coherence uses Cache. So maybe Region is more versatile.
  -- Each Aggregate in a Region (Region is at least a placeholder) is represented as an Entry in the Region. Entries are referenced by clients by an EntryRef.
  -- If any of this seems acceptable, then you have a Region and RegionCoordinator, for example. Regions can be specifically named or can be named via an algorithm.
 
- I think that all of these are a better fit for Justin's concerns. That makes this approach much more applicable to all kinds of uses, not just domain models.

Patrik Nordwall

unread,
Nov 21, 2013, 3:12:00 AM11/21/13
to akka...@googlegroups.com
Hi Vaughn,


On Thu, Nov 21, 2013 at 4:00 AM, Vaughn Vernon <vve...@shiftmethod.com> wrote:
Hi Patrik,

Below are some observations. I am replying late in the discussion, so several of my comments are based on what has already been kicked around.

Vaughn



On Tuesday, November 19, 2013 7:56:28 AM UTC-7, patriknw wrote:
TL;DR: I have created a prototype for sharding of Aggregate Roots in Akka Cluster.

No, I think it is well stated.

:-)
 
 

I have been thinking about what we need to have a good support for akka-persistence in akka-cluster.

For background there has been several interesting threads on the mailing list that touch this topic. The most related is probably "Creating one instance of an Akka actor per entity (domain-driven-design)"

My conclusion is that we need a sharding mechanism that can allocate Aggregate Root instances to nodes in the cluster and make them transparently accessible from other nodes.

At a high level:
- On each node you run a Repository actor. (Several of these are possible)

I really dislike this naming, especially when used in a DDD setting. I think that Justin (others?) makes some really good points about naming, and I believe that attention to naming should be more sensitive; very well thought out.

I agree, and I'm very open for completely different naming.
 
Further below I provide some perspectives that you may want to consider, but I won't be able to resist some forward references. The main reason I say that Repository is inappropriate, even in a DDD setting, is because a Repository should never or almost never be responsible for business logic. The "almost never" is because sometimes you have to get the database, which is backing the Repository, to calculate something for you because doing so in the VM will not perform well.
 
- All messages to an Aggregate Root goes through the Repository.

Okay, but again the naming is weak. All messages to X goes through X-Router. Maybe X is Entry (or possibly Entity) and maybe X-Router is Cache or Region.

 
- Aggregate Roots are grouped into Shards, which is a user provided function from Aggregate Root Id to Shard Id. A Shard is the smallest piece that can be allocated to a Repository and also the smallest piece that is migrated when rebalancing.

Good. This may be old news to some, but I thought it was an interesting finding on my part. If the Aggregate unique Id is a UUID, it is really easy to create a consistent hash like so:

    private void distributeKeysToPartitions(List<Set<String>> aPartitions, int aTotalKeys) {
        long numberOfPartitions = aPartitions.size();

        for (int i = 1; i < aTotalKeys + 1; ++i) {
            String key = UUID.randomUUID().toString(); // String.format("%08d", i);

            long keyHash = key.hashCode();
            Random random = new Random();
            int actorTypeIndex = random.nextInt(actorTypes.length);
            long actorTypeHash = actorTypes[actorTypeIndex].hashCode();

            long partition = (keyHash + actorTypeHash) % numberOfPartitions;

            if (partition < 0) {
                partition = -partition;
            }

            aPartitions.get((int) partition).add(key);

            if (((i + 1) % 100000) == 0) {
                System.out.print(".");
            }
            if (((i + 1) % 1000000) == 0) {
                System.out.println();
            }
        }
    }

Believe it or not, across any number of partitions (choose 4 or choose 10,000), and anywhere into the millions of UUID aggregate identities, the keys will hash to almost equal buckets. What I mean by almost equal is that with even 10,000 nodes and 1MM ids, you will see partitions/nodes with spread within a few hundred to at most a few thousand difference.

That said, this is just one way to produce a random shard/partition/node location. I think that there is high value in considering the possibility, however, that calculating shards isn't always the best idea. Sometimes you just want to name the Cache/Region that you want to be part of. (Again, see further below.)

Thanks for sharing. Yes, it is important that the sharding function can be defined by the user.
 

 
- There is a ShardCoordinator, which is a central authority for allocating Shards to Repositories. It is a cluster singleton, and has persistent state of the shard locations.

Where is it located?

Cluster singleton actor is located on the oldest member in the cluster, i.e. the node that was started first. When that is removed it fails over to the new oldest. You can group nodes with roles, so you can have a singleton per role.
 
 
- When a Repository receives a message for an Aggregate Root (Shard) that it doesn't know about it sends a request to the ShardCoordinator. All messages for that Shard are buffered until the Repository receives the location of the Shard.

- Is this using a Stash? If so, isn't that an unreliable temporary queue?

It is not Stash. It is an internal Vector per shard. It is unreliable, temporary queue. End-to-end reliable messaging between client and Aggregate Root (I stick to the naming until we have changed it) should be done with reliable channels, which are provided by akka-persistence.
 
 
- If the location of the Shard is the Repository itself the message is delivered to child actor  representing the Aggregate Root, otherwise it forwards the message to the responsible Repository. The Repository caches the Shard locations.

- Not sure I follow this.

Scenario 1:
1. Incoming message M1 to repository instance R1. 
2. M1 is mapped to shard S1. R1 doesn't know about S1, so it asks the coordinator C for the location of S1. 
3. C answers that the home of S1 is R1.
4. R1 creates child actor for Aggregate Root AR1 and sends buffered messages for S1 to AR1 child
5. Thereafter all incoming messages to R1 for S1 can be handled by R1 without C. It creates AR children as needed, and forwards messages to them.

Scenario 2
1. Incoming message M2 to R1. 
2. M2 is mapped to S2. R1 doesn't know about S2, so it asks C for the location of S2. 
3. C answers that the home of S2 is R2.
4. R1 sends buffered messages for S2 to R2
5. Thereafter all incoming messages to R1 for S2 can be handled by R1 without C. It forwards messages to R2.
6. R2 receives message for S2, ask C, which answers that the home of S2 is R2, and we are in Scenario 1 but for R2 
 

 
- Aggregate Root child actors are created on demand from user provided Props. An Aggregate Root actor is typically a Processor or EventSourcedProcessor. The Aggregate Root can stop itself in case it is idle to reduce memory consumption.

- This is why, in my comments below, I think that this effort isn't really a Cache. It is more of a specialized Grid/Fabric.

Ok, I didn't intend to mean that it is a Cache. The location of the shards are cached instead of asking the coordinator for each incoming message.
 
 
- Ordinary death watch is used between the Repositories and ShardCoordinator.
- The ShardCoordinator rebalances the shards, e.g. when new nodes join the cluster. This is the most difficult part. E.g. we must avoid message re-ordering when a Shard is migrated from one Repository to another. 

- Consider evicting Actors from the Shard/Cache/Region when a node is added. Eviction is based on a new hash calculation. When the next message is sent to one of the evicted Actors, the Actors is brought to life again but now on the new node.

Alright, that is pretty much how it is implemented. Eviction is called HandOff, and it is controlled by the coordinator.
 
 

It is important to highlight that in this solution I don't try to solve:
1. Distributed journal. That should be handled by a journal implementation backed by a distributed data/event store.
2. Reliable messaging between client and Aggregate Root. That should be solved by channels.
3. Better failure detection or management of network partitions than what is provided by Akka Cluster.

- Here are my general comments. I think that what is being developed here is not just for DDD. It is more a Grid.

I agree
 
It may not be so much a set of Caches because an Actor is transient. So maybe using the follow terms are more appropriate:

  -- Grid or Fabric defines the overall concept. I think that Grid is probably sufficient.
  -- Each Aggregate is associated with an "area" in the Grid. You could call this a Shard, but maybe it is more like a Region.
     It's a Cache, but not a Cache in the sense that Coherence uses Cache. So maybe Region is more versatile.
  -- Each Aggregate in a Region (Region is at least a placeholder) is represented as an Entry in the Region. Entries are referenced by clients by an EntryRef.
  -- If any of this seems acceptable, then you have a Region and RegionCoordinator, for example. Regions can be specifically named or can be named via an algorithm.
 
- I think that all of these are a better fit for Justin's concerns. That makes this approach much more applicable to all kinds of uses, not just domain models.


I'm convinced. It should be (and is) DDD agnostic and have different naming than I have used in the first prototype.

I kind of like Shard, which also is well know in distributed data stores.

Perhaps everything falls into place if we rename Repository to Region, and say Entry instead of Aggregate Root.
A Region is responsible for 0 or more Shards.
A Shard is a group of Entries
ShardCoordinator decides in which Region a Shard is located.

Thank you Vaughn for very valuable feedback!

Regards,
Patrik

 

You find the glory details in the code: https://github.com/akka/akka/pull/1843

This is still in the prototype stage and I appreciate any kind of feedback. Try to come up with problematic corner cases.

Looking forward to the discussions on this.



--

Patrik Nordwall
Typesafe Reactive apps on the JVM
Twitter: @patriknw

--
You received this message because you are subscribed to the Google Groups "Akka Developer List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-dev+u...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Patrik Nordwall

unread,
Dec 3, 2013, 5:20:35 AM12/3/13
to akka...@googlegroups.com
I have updated the pull request based on your feedback. 
I renamed Aggregate Root to Entry and Repository to ShardRegion

Implemented missing things such as:
- Limit buffer size
- Support buffering during passivate
- Pluggable shard allocation strategy (and supporting Justin's use case also)
- Java API
- Proxy only mode
- Extension for ease of use
- SharedLeveldbStore in test

Remaining:
- ScalaDoc and rst docs
- Snapshotting in ShardCoordinator


WDYT?

Andrew Easter

unread,
Dec 3, 2013, 11:30:47 PM12/3/13
to akka...@googlegroups.com
On Tuesday, 3 December 2013 02:20:35 UTC-8, patriknw wrote:
I have updated the pull request based on your feedback. 
I renamed Aggregate Root to Entry and Repository to ShardRegion

Implemented missing things such as:
- Limit buffer size
- Support buffering during passivate
- Pluggable shard allocation strategy (and supporting Justin's use case also)
- Java API
- Proxy only mode
- Extension for ease of use
- SharedLeveldbStore in test

Remaining:
- ScalaDoc and rst docs
- Snapshotting in ShardCoordinator


WDYT?

Very, very cool. This is properly exciting :-)

Andrew 

Justin du coeur

unread,
Dec 4, 2013, 8:01:54 AM12/4/13
to akka...@googlegroups.com
On Tue, Dec 3, 2013 at 11:30 PM, Andrew Easter <andrew...@gmail.com> wrote:
On Tuesday, 3 December 2013 02:20:35 UTC-8, patriknw wrote:
I have updated the pull request based on your feedback. 
[...]
WDYT?

Very, very cool. This is properly exciting :-)

Ditto.  I think this is going to prove a very important tool in the Akka stack... 

Jonas Bonér

unread,
Dec 4, 2013, 10:40:46 AM12/4/13
to akka...@googlegroups.com
Awesome job Patrik. 
Jonas Bonér
Phone: +46 733 777 123
Home: jonasboner.com
Twitter: @jboner

Raymond Roestenburg

unread,
Dec 4, 2013, 1:35:28 PM12/4/13
to akka...@googlegroups.com
Patrick does it again.. :-)
Any idea when this will land in contrib in a reasonably solid state?
Awesome job Patrik. 
--
Jonas Bonér
Phone: +46 733 777 123
Home: jonasboner.com
Twitter: @jboner

--
You received this message because you are subscribed to the Google Groups "Akka Developer List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-dev+u...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.


--
Raymond Roestenburg

Patrik Nordwall

unread,
Dec 5, 2013, 2:55:18 AM12/5/13
to akka...@googlegroups.com
Thank you guys. I will try to complete the docs next week, and then there is a review period. Anyway it's very self contained so if you would like to try it out you can copy the code and use it with one of the latest timestamped akka versions. It doesn't work with 2.3-M1.

Roland Kuhn

unread,
Dec 6, 2013, 12:02:28 PM12/6/13
to akka-dev
It took a while to work up the courage to tackle the review of this large pull request, but

THIS. IS. AWESOME.

I’m super excited about this. And the code is nice and clean, too. Thanks to all who contributed, which definitely includes Justin, Vaughn, Andrew, Martin and Patrik.

Regards,

Roland


Dr. Roland Kuhn
Akka Tech Lead
Typesafe – Reactive apps on the JVM.
twitter: @rolandkuhn


Vaughn Vernon

unread,
Dec 9, 2013, 12:45:44 PM12/9/13
to akka...@googlegroups.com
Hi Patrik,

Obviously if Roland is happy it must be great. Given that you implemented my suggestions, some of my similar work is probably redundant, and your understanding at the cluster-level is best. Still I think I may still be able to kick in a few goodies. I don't want to just code review, I would prefer to try to use what you've done and see where my further contributions might fit in.

I have not used akka-cluster before. I assume you can run a single node cluster easily enough. Can I do some work on this without getting into major involvement in build and configuration knowledge? Could you describe the environment for this? I just want to follow a straight line from here to there.

Vaughn
To unsubscribe from this group and stop receiving emails from it, send an email to akka-dev+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/groups/opt_out.


--
Raymond Roestenburg


--
You received this message because you are subscribed to the Google Groups "Akka Developer List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-dev+u...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.



--

Patrik Nordwall
Typesafe Reactive apps on the JVM
Twitter: @patriknw



--
You received this message because you are subscribed to the Google Groups "Akka Developer List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-dev+u...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Patrik Nordwall

unread,
Dec 9, 2013, 1:46:18 PM12/9/13
to akka...@googlegroups.com
Hi Vaughn,


On Mon, Dec 9, 2013 at 6:45 PM, Vaughn Vernon <vve...@shiftmethod.com> wrote:
Hi Patrik,

Obviously if Roland is happy it must be great. Given that you implemented my suggestions, some of my similar work is probably redundant, and your understanding at the cluster-level is best. Still I think I may still be able to kick in a few goodies. I don't want to just code review, I would prefer to try to use what you've done and see where my further contributions might fit in.

Sounds great.
 

I have not used akka-cluster before. I assume you can run a single node cluster easily enough. Can I do some work on this without getting into major involvement in build and configuration knowledge? Could you describe the environment for this? I just want to follow a straight line from here to there.

Then you should at least wait until this is merged to master and we have a nightly build. We will drop a milestone release (2.3-M2) early next week. That is probably a good time to try this. Yes, you can run it on a single machine and I will give you some kind of instructions to make it easy. I will let you know here when it is ready.

Thanks,
Patrik

Vaughn Vernon

unread,
Dec 9, 2013, 2:20:01 PM12/9/13
to akka...@googlegroups.com
Perfect.


--
You received this message because you are subscribed to a topic in the Google Groups "Akka Developer List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-dev/ohdT-Et4ZoY/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-dev+u...@googlegroups.com.

Patrik Nordwall

unread,
Dec 16, 2013, 7:15:19 AM12/16/13
to akka...@googlegroups.com
2.3-M2 is released, and here is a runnable sample of the cluster sharding feature: https://github.com/typesafehub/activator-akka-cluster-sharding-scala
See instructions in 'tutorial/index.html'


Looking forward to your feedback!

Cheers,
Patrik
Reply all
Reply to author
Forward
0 new messages