Bulletproofing HAppS

1 view
Skip to first unread message

S. Alexander jacobson

unread,
Jul 24, 2009, 12:47:45 PM7/24/09
to HA...@googlegroups.com
I've been thinking through what it takes to get happs to industrial strength on reasonable network architectures.  The original multi-master idea was based on the assumption that reliable network attached storage was expensive/hard; that it was in some sense cheaper to a local disk for each instance.  Network attached storage has gotten a lot cheaper/easier over the last four years.  Amazon Elastic Block Store makes it practically effortless for EC2 instances.  Given the added complexity of finishing implementation and and proving multi-master reliability, it may make sense now to switch to master-slave architecture (which also allows us to eliminate the Spread dependency). 

Please comment.

== Master-Slaves Operation for handling larger query volume ==

In master-slave operation, the master is the only server with access to the network attached filesystems that store checkpoints and log.  The slaves do not need their own disk or access to this storage.  At start-up, slaves load state, log and events streams from the master rather from any disk. Slaves handle state queries locally, but proxy updates to the master.  For testing comfort, at start-up, slaves should also attempt to write state and log to /dev/null as well as run any user defined tests.

The assumed configuration here is a network shared ordered list of servers in that cluster.  A load balancing system distributes traffic to the servers on this list. 

The definition of master/slave is via an election.  Every n seconds, every server sends to all other servers a list of all the servers from which it received such a message in the last time block.  The master is defined to be the highest rank server that a majority received a list from in the last time block. 

To be clear, this has to be a majority of the servers on the list not a majority of those reporting or else weird corner cases emerge.  Therefore, a master is not defined until a majority of servers on the list are live and sending visibility reports.

== Safe Zero Downtime Cluster Migration ==

  1. Upgrade slaves one by one.  If they can get to current state, run user defined tests, and handle query volume without barfing, keep updating.
  2. Update the master last.
That being said, it would be nice to have faster detection of migration errors.   The system should be modified to write a hash of the type structure of state and logs at the head of state and log files and check that this hash matches at load time.  Bonus points for TH that (at compile time) writes the hashes to a file in the source hierarchy so you can find the code version that matches this particular saved state later.

==  Easy Amazon EC2 Provisioning Monad for Single Master operation ==

Most Haskell developers do not have easy access to multi-server data centers on which to develop.  However, Amazon, Rackspace and others now provide provisioning APIs, network load balancing, server monitoring, network storage in relatively easy to use packages.  It makes sense for HAppS to target these data center APIs.  I imagine a HAppS datacenter monad with the following functions:

> type ResultInfo = (Either Error Success)
> newState::DC StateId -- creates state and log network storage volumes for the app (elastic block stores on Amazon)
> copyState::StateId -> DC StateId -- useful for testing
> delState::StateId -> DC ResultInfo -- useful when done testing
> upState::DomainName -> AppCode -> StateId -> DC ResultInfo -- start/upgrade app on state from a given domain name
> downState::StateId -> EC2 (Either Error Success) -- shutdown running cluster

Note that Amazon provides auto-scaling so you don't have to define a number of servers required for your app.  It also provided ElasticLoadBalancing so at this stage we don't need to write that code into HAppS.

== Sharding ==

With reliable HAppS on the horizon, we also need a path to scale our app data past the current maximum size of machines provided by Amazon, MediaTemple, Rackspace, etc.   My instinct is to define a standard sharding implementation for ixset.  We can add other data structures in the future.  

The architecture of an application is some components whose underlying data is sharded and some components whose data is global to all app instances.  Every item stored in a ShardedIxset has an index on a 64-bit ObjectId.

I imagine a ShardManager component that is shared and provides a directory mapping n-bit prefixes of these objectIds to shard clusters.   The ShardManager monitors shards for each component.  If the shard is getting full or excessively busy, it splits it, otherwise it merges.  Splitting/merging IxSets on ObjectIds is very straightforward.  Splitting/merging live shard clusters is a little more complex but not crazily so.  The big issue is managing dispatch of updates so nothing is lost in the process.

The ShardedIxSet insert and delete functions use the directory to deliver instructions to the relevant shards.  We add get/put functions for per object changes.  The get/put functions use the directory to interface only with the relevant shard.  The update and  query functions broadcast to all shards and block until all shards have responded.

This form of sharding will likely require some mucking about in the component architecture, but I hope it is not so difficult to accomplish.


== Upgrading/Migrating Sharded Data ==

  1. Upgrade a slave on each shard
  2. Upgrade the master on each shard last. 
  3. If any failures, rollback the entire upgrade

-Alex-

Lemmih

unread,
Jul 25, 2009, 11:22:21 AM7/25/09
to HA...@googlegroups.com

I like this approach. It should be much simpler to implement than
multi-master. However, the latency involved with assigning a new
master is rather steep and could almost be described as a single point
of failure. Since upgrading the master is basically a controlled
crash, this is a sore point that would be felt often.

> == Safe Zero Downtime Cluster Migration ==
>
> Upgrade slaves one by one.  If they can get to current state, run user
> defined tests, and handle query volume without barfing, keep updating.
> Update the master last.
>
> That being said, it would be nice to have faster detection of migration
> errors.   The system should be modified to write a hash of the type
> structure of state and logs at the head of state and log files and check
> that this hash matches at load time.  Bonus points for TH that (at compile
> time) writes the hashes to a file in the source hierarchy so you can find
> the code version that matches this particular saved state later.

The migration system limits the usefulness of a simple hash, but I'm
sure we can find a satisfactory solution.

I'm having a hard time grokking this. The number of shards isn't
determined by the number of nodes in the cluster?
Here's what I understand so far:

* A 'cluster' is a collection of 'nodes'. Each 'node' is capable of
computations.
* A 'component' is the concept of a Haskell data-structure. It may
exists in fragments across the cluster although this wouldn't be
directly visible to the programmer.
* A 'shard' is a subset or slice of a 'component'. Shards are
"physical" manifestations of components and should be available (for
queries) on at least three nodes.
** I'm not sure how many replicas are needed. We do not need them to
prevent data loss anymore, but too few of them could lead to
interruptions of service.
* There are two distinct designations: Shard master and shard member.
Both designations are parameterized by shard number and component id.
* A 'node' can have multiple designations.
* A shard master must also be a shard member for the same shard and component.
* For each shard in every component, one (and only one) node must be
designated as 'shard master'.
* Computational obligations are determined by designations:
** For shard members: Apply queries locally, send updates to the
shard master, and apply updates from the shard master locally.
** For shard masters: Receive updates from shard members, apply
updates locally, and stream update events to the shard members after
they've been applied.

Since each shard master has to maintain an event log and checkpoint
file, each of them need a separate block store. Managing that many
filesystems would be cumbersome.

--
Cheers,
Lemmih

S. Alexander Jacobson

unread,
Jul 26, 2009, 9:32:50 AM7/26/09
to HA...@googlegroups.com
On 7/25/09 11:22 AM, Lemmih wrote:
On Fri, Jul 24, 2009 at 6:47 PM, S. Alexander
jacobson<al...@alexjacobson.com> wrote:
  
I've been thinking through what it takes to get happs to industrial strength
on reasonable network architectures.  The original multi-master idea was
based on the assumption that reliable network attached storage was
expensive/hard; that it was in some sense cheaper to a local disk for each
instance.  Network attached storage has gotten a lot cheaper/easier over the
last four years.  Amazon Elastic Block Store makes it practically effortless
for EC2 instances.  Given the added complexity of finishing implementation
and and proving multi-master reliability, it may make sense now to switch to
master-slave architecture (which also allows us to eliminate the Spread
dependency).

    
I like this approach. It should be much simpler to implement than
multi-master. However, the latency involved with assigning a new
master is rather steep and could almost be described as a single point
of failure. Since upgrading the master is basically a controlled
crash, this is a sore point that would be felt often.
  
Why would the latency be high?  If the election period is e.g. .5 seconds, then it takes 1 second to discover that a majority can't see the master and, if the network is not segmented, for a slave that is already up-to-date, simply to start handling updates itself.

A subtlety is that the new master has to know that it has gotten all updates.  A solution to that problem is for the broadcast lists also to include the last update received and for the new master to get the last update received by the surviving servers.
(We should also operate so that an update isn't ack-ed back to the client until it has been distributed to a majority of the machines -- this has latency but not throughput consequences)

Is there other latency that I am missing?
== Safe Zero Downtime Cluster Migration ==

Upgrade slaves one by one.  If they can get to current state, run user
defined tests, and handle query volume without barfing, keep updating.
Update the master last.

That being said, it would be nice to have faster detection of migration
errors.   The system should be modified to write a hash of the type
structure of state and logs at the head of state and log files and check
that this hash matches at load time.  Bonus points for TH that (at compile
time) writes the hashes to a file in the source hierarchy so you can find
the code version that matches this particular saved state later.
    
The migration system limits the usefulness of a simple hash, but I'm
sure we can find a satisfactory solution.
  
It seems like we can use Data.Data and Data.Typeable to give each full datatype a uinique hash....?


  
==  Easy Amazon EC2 Provisioning Monad for Single Master operation ==

Most Haskell developers do not have easy access to multi-server data centers
on which to develop.  However, Amazon, Rackspace and others now provide
provisioning APIs, network load balancing, server monitoring, network
storage in relatively easy to use packages.  It makes sense for HAppS to
target these data center APIs.  I imagine a HAppS datacenter monad with the
following functions:

    
type ResultInfo = (Either Error Success)
newState::DC StateId -- creates state and log network storage volumes for
the app (elastic block stores on Amazon)
copyState::StateId -> DC StateId -- useful for testing
delState::StateId -> DC ResultInfo -- useful when done testing
upState::DomainName -> AppCode -> StateId -> DC ResultInfo --
start/upgrade app on state from a given domain name
downState::StateId -> EC2 (Either Error Success) -- shutdown running
cluster
      
Note that Amazon provides auto-scaling so you don't have to define a number
of servers required for your app.  It also provided ElasticLoadBalancing so
at this stage we don't need to write that code into HAppS.
    

Just to clarify the implicit data model here, I think of a running application in terms of the following components:

* Domain name
* Current application code
* Current state
* Cluster(s) of (virtual) servers/nodes.

I use cluster(s) plural for when we implement sharding or other forms of app segmentation.

Thinking further on the above API, I realize it does not really capture the workflows we want on Amazon:

* start new app with new state
* test future app code on sample state
* test future app code on current state
* push new app code to be current app code
* suspend (while in development before going actually live)

Which means that the app itself should expose an interface to get sample state from the current state as well as tests to run before promoting code to live. 
Yes node== (EC2) server

* A 'component' is the concept of a Haskell data-structure. It may
exists in fragments across the cluster although this wouldn't be
directly visible to the programmer.
  
If the user picks a shardable component then it may be fragmented across the cluster.  
Components that are not explicitly identified as shardable should have a copy on every node. 
* A 'shard' is a subset or slice of a 'component'. Shards are
"physical" manifestations of components and should be available (for
queries) on at least three nodes.
  
  ** I'm not sure how many replicas are needed. We do not need them to
prevent data loss anymore, but too few of them could lead to
interruptions of service.
  
Yes, good practice is for any shard cluster to have 3 nodes.  Conceptually if you don't care about downtime, shard clusters can have one node and the whole app cam wait for amazon to provision a new node to replace one that died.  There may be app contexts in which this tradeoff is reasonable so we don't want to presuppose three nodes only recommend it.

The nice thing about being a recommendation rather than requirement is that conceptually it means that we can make progress on sharding while making progress on replication.


* There are two distinct designations: Shard master and shard member.
Both designations are parameterized by shard number and component id.
  
I'm inclined to think that shard's have IDs rather than numbers.  If future versions, we may want to designate shard cluster locations based on some info about the object (so we keep data from users in a particular location in the data center closest to the location).


* A 'node' can have multiple designations.
  
* A shard master must also be a shard member for the same shard and component.
  
What do these mean?


* For each shard in every component, one (and only one) node must be
designated as 'shard master'.
  
yes.


* Computational obligations are determined by designations:
  ** For shard members: Apply queries locally, send updates to the
shard master, and apply updates from the shard master locally.
  ** For shard masters: Receive updates from shard members, apply
updates locally, and stream update events to the shard members after
they've been applied.

  
Also, for non-members of the shard, distribute queries/updates to all shards.  But get/put/post/delete only to the shard with the relevant objectId.

My instinct is not to require coherence at this time so if the app is concerned that not all shards have received an update query, it can ask again or handle at the application level in some way. 

That being said, it would be nice if the system provided some way to maximize the likelihood of full distribution of queries and updates so we are likely to want to use spread in some way here.

Since each shard master has to maintain an event log and checkpoint
file, each of them need a separate block store. Managing that many
filesystems would be cumbersome.

  
Among the reasons that I like targeting Amazon Web Services is that their Elastic Block Store makes this really easy.  The have an API that provisions new block stores in real time.  See <http://aws.amazon.com/ebs/>

-Alex-

Lemmih

unread,
Jul 26, 2009, 10:56:41 AM7/26/09
to HA...@googlegroups.com

To be slightly more precise, the new master has to know exactly which
update event was the last to be written to the log file.

> Is there other latency that I am missing?

I feared (and still do) that the new master would have to replay the event log.

I thought the idea of the master-slave approach would be to sidestep
sequencing issues by letting the master decide on the order to apply
updates. If each node has to determine the order of events
independently then we're right back at the multi-master approach.

In other words, we can't rely on the master for event ordering since
it cannot write a log entry and broadcast an event in a single atomic
operation.

> == Safe Zero Downtime Cluster Migration ==
>
> Upgrade slaves one by one.  If they can get to current state, run user
> defined tests, and handle query volume without barfing, keep updating.
> Update the master last.
>
> That being said, it would be nice to have faster detection of migration
> errors.   The system should be modified to write a hash of the type
> structure of state and logs at the head of state and log files and check
> that this hash matches at load time.  Bonus points for TH that (at compile
> time) writes the hashes to a file in the source hierarchy so you can find
> the code version that matches this particular saved state later.
>
>
> The migration system limits the usefulness of a simple hash, but I'm
> sure we can find a satisfactory solution.
>
>
> It seems like we can use Data.Data and Data.Typeable to give each full
> datatype a uinique hash....?

Sure, but a simple hash may not be that useful. Say you change some
sub-type in your data structure from 'String' to the newtype 'Name'.
Now the hash will change but you'd still be able to parse your old
data if you defined a migration from 'String' to 'Name'.

Just that masters also function as members for their respective
shards. That is, a node can't be a master if the shard isn't available
locally.

> * For each shard in every component, one (and only one) node must be
> designated as 'shard master'.
>
>
> yes.
>
> * Computational obligations are determined by designations:
> ** For shard members: Apply queries locally, send updates to the
> shard master, and apply updates from the shard master locally.
> ** For shard masters: Receive updates from shard members, apply
> updates locally, and stream update events to the shard members after
> they've been applied.
>
>
>
> Also, for non-members of the shard, distribute queries/updates to all
> shards.  But get/put/post/delete only to the shard with the relevant
> objectId.
>
> My instinct is not to require coherence at this time so if the app is
> concerned that not all shards have received an update query, it can ask
> again or handle at the application level in some way.
>
> That being said, it would be nice if the system provided some way to
> maximize the likelihood of full distribution of queries and updates so we
> are likely to want to use spread in some way here.
>
> Since each shard master has to maintain an event log and checkpoint
> file, each of them need a separate block store. Managing that many
> filesystems would be cumbersome.
>
>
>
> Among the reasons that I like targeting Amazon Web Services is that their
> Elastic Block Store makes this really easy.  The have an API that provisions
> new block stores in real time.  See <http://aws.amazon.com/ebs/>

Oh, excellent.

--
Cheers,
Lemmih

Lemmih

unread,
Jul 26, 2009, 12:08:54 PM7/26/09
to HA...@googlegroups.com

Summary from chat with Alex:

Replaying events from the log /is/ necessary but not a big problem
since the number of new events is likely to be small. That is, when
the old master does down, the new master reads the event log backwards
to find events that were recorded but not sent to the slaves.

So, let's recap:
* A single master is responsible for the order of update events. The
sequenced events are saved to the log file and then broadcast to the
slaves.
* Slaves try to stay current by applying the event stream from the master.
* If/When the master dies, a new master is elected.
* The new master replays the events log from the highest common event
ID that each slave received. This closes the gap between saving events
on disk and sharing them with the slaves.

--
Cheers,
Lemmih

Gregory Collins

unread,
Jul 26, 2009, 12:23:53 PM7/26/09
to HA...@googlegroups.com
Lemmih <lem...@gmail.com> writes:

> Replaying events from the log /is/ necessary but not a big problem
> since the number of new events is likely to be small. That is, when
> the old master does down, the new master reads the event log backwards
> to find events that were recorded but not sent to the slaves.
>
> So, let's recap:
> * A single master is responsible for the order of update events. The
> sequenced events are saved to the log file and then broadcast to the
> slaves.
> * Slaves try to stay current by applying the event stream from the master.
> * If/When the master dies, a new master is elected.
> * The new master replays the events log from the highest common event
> ID that each slave received. This closes the gap between saving events
> on disk and sharing them with the slaves.

Am I right in concluding that you mean that writes are only permitted on
the master (a la MySQL)?

G
--
Gregory Collins <gr...@gregorycollins.net>

S. Alexander Jacobson

unread,
Jul 26, 2009, 1:01:15 PM7/26/09
to HA...@googlegroups.com
On 7/26/09 12:08 PM, Lemmih wrote:
> Summary from chat with Alex:
>
> Replaying events from the log /is/ necessary but not a big problem
> since the number of new events is likely to be small. That is, when
> the old master does down, the new master reads the event log backwards
> to find events that were recorded but not sent to the slaves.
>
> So, let's recap:
> * A single master is responsible for the order of update events. The
> sequenced events are saved to the log file and then broadcast to the
> slaves.
> * Slaves try to stay current by applying the event stream from the master.
> * If/When the master dies, a new master is elected.
> * The new master replays the events log from the highest common event
> ID that each slave received. This closes the gap between saving events
> on disk and sharing them with the slaves.
>
>

A master is determined to be dead by election. The order of succession
is predefined for a cluster.

The election process is that every 500ms, each server broadcasts the
list of servers from which it received a broadcast in the prior 500ms as
well as its current highest log sequence number. The master is declared
dead if the majority of servers didn't receive a broadcast from it in
the last period.

When the top ranked slave takes over as master, it plays back the log
from the lowest sequence number recieved in the last broadcast.
However, a slave may request missing log entries if it was also down
during the last broadcast period.

-Alex-

S. Alexander Jacobson

unread,
Jul 26, 2009, 1:08:46 PM7/26/09
to HA...@googlegroups.com
I am not a current user of MySQL but my understanding is that it requires the client to know which server is the master and send updates to one server and queries to the others.

In the design we are discussing here, the framework takes care of that routing and the app is protected from knowing the identity of the master rather than the slave.  All nodes look the same from the user perspective.  Writes are permitted on any node.  This is just a question of how the system does routing internally.

To be clear, as long as the routing is kept hidden from the user, the reason to choose multi-master versus master-slave has much more to do with expectations about the availability of reliable network attached storage rather than any performance issues.  Master-slave should actually have slightly better performance characteristics because it does not have to wait on an ACK from all the other servers.  The reason we designed for multi-master originally was that network attached storage was complex and expensive and so we preferred to achieve durability by having each server write a copy of the checkpoint and log to its own disk.  With the advent of Amazon Elastic Block Store and similar services, the balance has shifted so now Master-Slave seems like the preferred architecture internally.

-Alex-



Gregory Collins

unread,
Jul 26, 2009, 1:39:11 PM7/26/09
to HA...@googlegroups.com
"S. Alexander Jacobson" <al...@alexjacobson.com> writes:

> I am not a current user of MySQL but my understanding is that it
> requires the client to know which server is the master and send
> updates to one server and queries to the others.

Right.

> In the design we are discussing here, the framework takes care of that
> routing and the app is protected from knowing the identity of the
> master rather than the slave. All nodes look the same from the user
> perspective. Writes are permitted on any node. This is just a
> question of how the system does routing internally.

But there still is a single master which is responsible for maintaining
an authoritative copy of the update log (albeit "behind the curtain")?


> To be clear, as long as the routing is kept hidden from the user, the
> reason to choose multi-master versus master-slave has much more to do
> with expectations about the availability of reliable network attached
> storage rather than any performance issues. Master-slave should
> actually have slightly better performance characteristics because it
> does not have to wait on an ACK from all the other servers. The
> reason we designed for multi-master originally was that network
> attached storage was complex and expensive and so we preferred to
> achieve durability by having each server write a copy of the
> checkpoint and log to its own disk. With the advent of Amazon Elastic
> Block Store and similar services, the balance has shifted so now
> Master-Slave seems like the preferred architecture internally.

OK. The reason I ask is that forcing writes to (ultimately) be shunted
through a single machine might result in a bottleneck under certain
conditions (jobs with very heavy write loads, for instance). My thinking
is that the desirability of multi-master is not necessarily a matter of
the reliability of network-attached storage, but rather that it allows
you to handle a write load heavier than a single machine can handle.

That alternative is not 100% appealing either though, you then introduce
other difficult issues -- you need something like consistent hashing to
identify where a particular data item resides in the cluster, write
conflicts necessitate a read-repair scheme (i.e. vector clocks), without
a quorum protocol you lose ACID in lieu of "eventual consistency", etc.

S. Alexander Jacobson

unread,
Jul 26, 2009, 10:19:06 PM7/26/09
to HA...@googlegroups.com
Master-slave does not substantially change bottleneck properties.
No matter what, there is a total order on all updates to state (or later to a shard of state).
The only change is that you don't have to wait for other members to ACK before considering an update official.

Thinking about transaction semantics for sharding, here are my thoughts:
  1. Updates to sharded components will be paired with an optional query command.
  2. The updateCommand gets written to a global update log for all shards.
  3. The optional query is executed against all shards as of the sequence number of the corresponding updateCommand.
  4. The queryResults are then distributed to each shard along with the updateCommand.
  5. Each shard executes the updateCommand locally.
The good news about this is that we retain ACID semantics across state.  For example, the query can verify that two users still exist before adding each to the other's friend list.  The query can verify that the from bank account has money before initiating a transfer. etc.

The theoretical bad news is that worst case performance when you need full ACID semantics, is throughput limited to the network latency of this system.  But this worst case performance assumes that every update steps on every other update.  In practice, most of the time they don't (or else you would design the app differently for other reasons).  So in practice, a few optimizations  get you back to high throughput.
  1. Most updates do not have global dependency and therefore don't carry a query so don't impose this penalty
  2. The system can do speculative query execution and rollback updates only when query values actually change (assumption is that query execution time is much less than network latency).
  3. The programmer can provide functions that allow the system to know that what sort of prior updates don't interfere.  For example, if no withdrawal or transfers occurred involving the relevant accounts since the speculative query was executed, the query results can be deemed good.  Note, I am skeptical that this actually helps much.  CPU power is sufficiently cheap that double query execution is a worthwhile trade-off to programmer effort providing these hints.
Does this make sense?  Do a distributed query that establishes the preconditions for local updates to each shard with speculative query/update execution to maintain throughput against network latency?

-Alex-

Gregory Collins

unread,
Jul 26, 2009, 11:58:31 PM7/26/09
to HA...@googlegroups.com
"S. Alexander Jacobson" <al...@alexjacobson.com> writes:

> Master-slave does not substantially change bottleneck properties. No
> matter what, there is a total order on all updates to state (or later
> to a shard of state).

Is that really true? It isn't obvious to me (but I'm pretty dense!)

Obviously if you presuppose ACID semantics then this has to be true. Of
course, in a distributed context maintaining the ACID semantics implies
one of a couple of conclusions (and apologies if this isn't an
exhaustive list, I'm not a domain expert here):

* you have a single master which maintains the authoritative update
log and is responsible for the total order you mentioned. Even if
the data itself is distributed/sharded across nodes in the cluster,
this implies a single choke point for updates to a metadata
repository. Here you spend less time in communication between nodes
but your write throughput is limited to how quickly the master can
perform metadata updates. If the master dies you have deep problems
to solve re: election of a new master.

* you allow writes to any node in the cluster, and the nodes use some
kind of consensus/quorum protocol (like Paxos) to enforce total
ordering in the metadata. Here you might be able to distribute write
load more evenly but Paxos has a lot of communication overhead
(which is why e.g. Google uses it so sparingly)

Another choice is to throw ACID out the window -- this allows you to
make great gains in write throughput, with the caveat that simultaneous
updates to the same data item can cause conflicts that need to be
detected and repaired somehow. You can detect conflicts using vector
clocks
(http://research.microsoft.com/users/lamport/pubs/time-clocks.pdf) but
then you have to fix the conflict somehow, and the right conflict
resolution policy depends on the data domain (i.e. for some problems
maybe you can merge, for some you might have to revert to an earlier
state and force the user to make a choice, etc.)

BTW: let me know if I'm blathering on about stuff everyone knows already
:)


> The only change is that you don't have to wait for other members to
> ACK before considering an update official.
>
> Thinking about transaction semantics for sharding, here are my
> thoughts:
>

> 1. Updates to sharded components will be paired with an optional
> query command.
> 2. The updateCommand gets written to a global update log for all
> shards.
> 3. The optional query is executed against all shards as of the


> sequence number of the corresponding updateCommand.

> 4. The queryResults are then distributed to each shard along with the
> updateCommand.
> 5. Each shard executes the updateCommand locally.


>
> The good news about this is that we retain ACID semantics across
> state. For example, the query can verify that two users still exist
> before adding each to the other's friend list. The query can verify
> that the from bank account has money before initiating a
> transfer. etc.

Maintaining ACID semantics here is very very tricky and there are many
pitfalls to deal with, I know I feel too stupid to tackle it at least!


> The theoretical bad news is that worst case performance when you need
> full ACID semantics, is throughput limited to the network latency of
> this system. But this worst case performance assumes that every
> update steps on every other update. In practice, most of the time
> they don't (or else you would design the app differently for other
> reasons). So in practice, a few optimizations get you back to high
> throughput.
>

> 1. Most updates do not have global dependency and therefore don't


> carry a query so don't impose this penalty
>

> 2. The system can do speculative query execution and rollback updates


> only when query values actually change (assumption is that query
> execution time is much less than network latency).
>

> 3. The programmer can provide functions that allow the system to know


> that what sort of prior updates don't interfere. For example, if no
> withdrawal or transfers occurred involving the relevant accounts
> since the speculative query was executed, the query results can be
> deemed good. Note, I am skeptical that this actually helps much.
> CPU power is sufficiently cheap that double query execution is a
> worthwhile trade-off to programmer effort providing these hints.

To be honest I'm not quite sure I'm following you here -- this is a
pretty abstract discussion and I don't feel comfortable that I'm smart
enough to stay "tethered to reality" here (if that makes sense). I can't
say for sure, but my "sense-feeling" tells me that determining whether
one update will conflict with another in the general case is probably
NP-complete (please prove me wrong!)


> Does this make sense? Do a distributed query that establishes the
> preconditions for local updates to each shard with speculative
> query/update execution to maintain throughput against network latency?

Once you go into this speculative update territory you have to introduce
dirty (and difficult) details like rollback/read repair/etc. Your
likelihood for introducing data inconsistency/corruption also goes
through the roof -- there's a reason why we don't have multimaster MySQL
servers, for instance!

Matthew Elder

unread,
Jul 27, 2009, 11:33:01 AM7/27/09
to HA...@googlegroups.com
It may have already been addressed, but s3 does not guarantee consistency -- only eventual consistency AFAIK. So the locking mechanism (if any) would not work on s3. If only one server is writing to the "event log" on s3 then this might work, but again, there is no guarantee that when another node takes over this log after a master failure, that it is consistent with what the master originally wrote, so there might be some data loss in this scenario.

--
Need somewhere to put your code? http://patch-tag.com
Want to build a webapp? http://happstack.com

S. Alexander Jacobson

unread,
Jul 27, 2009, 2:37:37 PM7/27/09
to HA...@googlegroups.com
On 7/26/09 11:58 PM, Gregory Collins wrote:
"S. Alexander Jacobson" <al...@alexjacobson.com> writes:

  
Master-slave does not substantially change bottleneck properties.  No
matter what, there is a total order on all updates to state (or later
to a shard of state).
    
Is that really true? It isn't obvious to me (but I'm pretty dense!)
  

I think you are confusing the issue of multi-master/multi-slave with the issue of how transactions work with sharding.  These two issues are orthogonal.  I will recap on both here.

== Multi-master vs Multi-Slave ==

Current state is whatever total order on updates has been made durable.  With multimaster, durability is achieved by having multiple masters confirm writes to their local disks.  With master-slave, durability happens only on the master's disk and we trust that:

(a) The master's durable storage is synchronous and robust with respect to failure
(b) That, in the event of failure of the master, another machine can mount this storage and become master.

In multi-slave, the master's storage layer takes care of making sure that the log write has been sufficiently replicated.
In multi-master, spread-based communication among the masters takes care of it.

== S3 vs EC2 (for Matther Elder) ==

We picked the multimaster architecture originally because reliable network attached storage was expensive and complex.  When Amazon introduced EC2, the only durable storage was its Simple Storage Service (aka S3).  S3 was an asynchronous mechanism so you had no easy way to know when your write had been made durable so we stuck with multimaster as the best architecture.

Now that Amazon has introduced Elastic Block Store, a synchronous file store that is easily remountable by any EC2 server, it seems preferable to focus on multi-slave and let EBS take care of durability.

== ACID Sharding ==

Let's go through the letters.

Atomicity: With sharding, an atomic transaction is a global query paired with a per shard update.  Unlike with SQL, a single global query can do a wide variety of lookups and calculations as can our update -- because both are full-on haskell functions.  As a result we don't need to interleave lots of different interactions and engage in the complex concurrency management forced on MySQL, Oracle, etc.  It is possible that there is functionality that is unachievable without the full ability to interleave arbitrary numbers of query and update commands into a single atom, but I am having trouble imagining what that would be.  The global query function, combined with the local update appear to cover the vast majority of cases without causing undue implementation complexity.  I challenge you to come up with an interesting case that cannot be handled by global query paired with local update. 

Consistency: With sharding, there is still a single log master for all shards.  Haskell purity guarantees consistency in the event of a log replay.  I am not worried at this point that log write throughput is an issue. 

Isolation: The query/update is defined to happen as of a particular log sequence number.  Haskell's laziness means that it is really easy to retain multiple generations of state so in the event that our speculative execution turns out to be wrong, we can go back.

Durability: There is a single master log written to reliable disk (Amazon EBS by default), however each shard will maintain its own checkpoints.  As a matter of implementation, I expect that we will have some discussion about having a discrete log server that is less integrated into app functionality.  Conceptually, durable logging can exist as a completely standalone cloud service... hmmm.....

Obviously if you presuppose ACID semantics then this has to be true. Of
course, in a distributed context maintaining the ACID semantics implies
one of a couple of conclusions (and apologies if this isn't an
exhaustive list, I'm not a domain expert here):
  
  * you have a single master which maintains the authoritative update
    log and is responsible for the total order you mentioned. Even if
    the data itself is distributed/sharded across nodes in the cluster,
    this implies a single choke point for updates to a metadata
    repository. Here you spend less time in communication between nodes
    but your write throughput is limited to how quickly the master can
    perform metadata updates. If the master dies you have deep problems
    to solve re: election of a new master.
  
What do you mean by metadata updates here?

  * you allow writes to any node in the cluster, and the nodes use some
    kind of consensus/quorum protocol (like Paxos) to enforce total
    ordering in the metadata. Here you might be able to distribute write
    load more evenly but Paxos has a lot of communication overhead
    (which is why e.g. Google uses it so sparingly)
  
Paxos is implementation complex.  There are somewhat less chatty variants, but imposing the constraint of not allowing interleaved transactions makes life much simpler.  We can impose that constraint because our queries and updates are full on functions in a turing complete language and not simple SQL commands.  Defining a single-master-log seems wayyy easier.
To make this less abstract.  Lets assume we have a request stream like:

(q1,u1),(q2,u2),(q3,u3),(q4,u4),(q5,u5)

The slow approach is to distribute the stream to all shards, wait for query values [v1], distribute [v1] to all shards,  wait for the q2 query values [v2], distribute them, etc.  In this case, update throughput is the inverse of network latency.

The much faster approach is to do speculative execution as follows.

1. Distribute the 5 item stream to all shards
2. Receive back definitive v1 and speculative v2-v5 from each shard (assuming u1-u4 don't affect their value)
3. Distribute (Definitive [v1], Speculative [v2-v5], Changes []) to all shards.
4. Receive back definitive v2 and speculative changes if any for v3,v4, and v5.   In this example assume only a v5 changed.
5. Distribute (Definitive v2, Speculative [], Changes [v5]) to all shards

We have now completed 5 updates in 3 round trips.  If the only change was v10, then we would have even more advantage.
The advantage revolves around how often updates cause changes in the speculative list.

The worst case is indeed network latency.  But, in practice, you don't generally have a stream of conflicting updates.


Does this make sense?  Do a distributed query that establishes the
preconditions for local updates to each shard with speculative
query/update execution to maintain throughput against network latency?
    
Once you go into this speculative update territory you have to introduce
dirty (and difficult) details like rollback/read repair/etc. Your
likelihood for introducing data inconsistency/corruption also goes
through the roof -- there's a reason why we don't have multimaster MySQL
servers, for instance!

G
  
I think multimaster is irrelevant to this topic, but either way, the reason you don't have ACID MySQL is that an SQL query or update is not as powerful as a Haskell function and simulating purity is difficult while Haskell gives it to you for free.

-Alex-

Matthew Elder

unread,
Jul 27, 2009, 5:01:28 PM7/27/09
to HA...@googlegroups.com

== S3 vs EC2 (for Matther Elder) ==

We picked the multimaster architecture originally because reliable network attached storage was expensive and complex.  When Amazon introduced EC2, the only durable storage was its Simple Storage Service (aka S3).  S3 was an asynchronous mechanism so you had no easy way to know when your write had been made durable so we stuck with multimaster as the best architecture.

Now that Amazon has introduced Elastic Block Store, a synchronous file store that is easily remountable by any EC2 server, it seems preferable to focus on multi-slave and let EBS take care of durability.
Ahh ok, I understand now, the durable synchronous storage is an EBS volume. I must have missed that somewhere. Thanks!

Gregory Collins

unread,
Jul 27, 2009, 5:49:02 PM7/27/09
to HA...@googlegroups.com
"S. Alexander Jacobson" <al...@alexjacobson.com> writes:

> == Multi-master vs Multi-Slave ==
>
> Current state is whatever total order on updates has been made
> durable. With multimaster, durability is achieved by having multiple
> masters confirm writes to their local disks. With master-slave,
> durability happens only on the master's disk and we trust that:

So let me see if I'm following you: for the purposes of this
conversation so far we're talking *only* about using replication to
ensure that each node in a cluster of happstack-state instances contains
an identical replica of the same data? (Putting issues of sharding/data
partitioning aside?)


> == ACID Sharding ==
>
> Let's go through the letters.
>
> Atomicity: With sharding, an atomic transaction is a global query
> paired with a per shard update. Unlike with SQL, a single global
> query can do a wide variety of lookups and calculations as can our
> update -- because both are full-on haskell functions. As a result we
> don't need to interleave lots of different interactions and engage in
> the complex concurrency management forced on MySQL, Oracle, etc.

What I don't understand here is how the data actually gets
sharded/partitioned. Take for example "Map k v". If I understand the
current system correctly (which is a long shot!), with a single node we
just store this map in memory, and checkpoint the entire thing to disk
periodically using the Serialize instance.

So if you want to shard a Map across your cluster, how is this done?


> Consistency: With sharding, there is still a single log master for all
> shards. Haskell purity guarantees consistency in the event of a log
> replay. I am not worried at this point that log write throughput is
> an issue.

So each "shard" could consist of e.g. a master and zero or more slave
replicas (keeping with your statement that replication is orthogonal to
sharding)? E.g. twelve machines could be configured as such?:

M M M M
S S S S
S S S S

shard 1 shard 2 shard 3 shard 4


> * you have a single master which maintains the authoritative
> update log and is responsible for the total order you
> mentioned. Even if the data itself is distributed/sharded across
> nodes in the cluster, this implies a single choke point for
> updates to a metadata repository. Here you spend less time in
> communication between nodes but your write throughput is limited
> to how quickly the master can perform metadata updates. If the
> master dies you have deep problems to solve re: election of a
> new master.
>
> What do you mean by metadata updates here?

That's a good question. The proposed design we're talking about isn't
clear to me at all, so please endure my pedantic questions. If you have
N shards in your cluster, you probably want to distribute 1/Nth of the
write load to each shard in the cluster. By "metadata" I mean keeping
track of what data is written where, but I guess I've been
misunderstanding you -- you want all of the writes to be written to the
master transaction log?

I think this discussion would be a little clearer to me if the
implementation details were more concrete (could we have a constructive
proof please? :) )

S. Alexander Jacobson

unread,
Jul 27, 2009, 7:45:00 PM7/27/09
to HA...@googlegroups.com
On 7/27/09 5:49 PM, Gregory Collins wrote:
"S. Alexander Jacobson" <al...@alexjacobson.com> writes:

  
== Multi-master vs Multi-Slave ==

Current state is whatever total order on updates has been made
durable.  With multimaster, durability is achieved by having multiple
masters confirm writes to their local disks.  With master-slave,
durability happens only on the master's disk and we trust that:
    
So let me see if I'm following you: for the purposes of this
conversation so far we're talking *only* about using replication to
ensure that each node in a cluster of happstack-state instances contains
an identical replica of the same data? (Putting issues of sharding/data
partitioning aside?)
  

The issue of multi-master vs multi-slave is orthogonal to that of the transaction semantics of sharding;
probably orthogonal to transaction semantics generally.

== ACID Sharding ==

Let's go through the letters.

Atomicity: With sharding, an atomic transaction is a global query
paired with a per shard update.  Unlike with SQL, a single global
query can do a wide variety of lookups and calculations as can our
update -- because both are full-on haskell functions.  As a result we
don't need to interleave lots of different interactions and engage in
the complex concurrency management forced on MySQL, Oracle, etc.
    
What I don't understand here is how the data actually gets
sharded/partitioned. Take for example "Map k v". If I understand the
current system correctly (which is a long shot!), with a single node we
just store this map in memory, and checkpoint the entire thing to disk
periodically using the Serialize instance.

So if you want to shard a Map across your cluster, how is this done?
  
My default assumption is that you want to shard an ixset rather than a map, but, if you want to shard a map, the natural approach is probably to a hash on the key.  You then have a directory that maps the first n bits of key to a particular shard (or set of shards depending on how we tackle adding/removing shards). 

You also need a filter for every update command that converts it to one command per shard.   For example, if you had an insertList command, it would convert that into one insertList per shard with the elements in each list being only those eligible to be inserted into that shard.  The map user does not see this directory.  It is used only by the filter commands.


Consistency: With sharding, there is still a single log master for all
shards.  Haskell purity guarantees consistency in the event of a log
replay.  I am not worried at this point that log write throughput is
an issue.
    
So each "shard" could consist of e.g. a master and zero or more slave
replicas (keeping with your statement that replication is orthogonal to
sharding)? E.g. twelve machines could be configured as such?:

    M         M         M         M   
    S         S         S         S   
    S         S         S         S   
                                      
 shard 1   shard 2   shard 3   shard 4
  
Given that there is one global log master, I am not sure how meaningful it is to talk about per shard masters.
Any slave can supply a checkpoint for its shard.  I am also not entirely sure that the log master directly runs any application logic (though it would be the natural place to run the functions that split commands into their per shard versions).


      * you have a single master which maintains the authoritative
      update log and is responsible for the total order you
      mentioned. Even if the data itself is distributed/sharded across
      nodes in the cluster, this implies a single choke point for
      updates to a metadata repository. Here you spend less time in
      communication between nodes but your write throughput is limited
      to how quickly the master can perform metadata updates. If the
      master dies you have deep problems to solve re: election of a
      new master.

What do you mean by metadata updates here?
    
That's a good question. The proposed design we're talking about isn't
clear to me at all, so please endure my pedantic questions. If you have
N shards in your cluster, you probably want to distribute 1/Nth of the
write load to each shard in the cluster. By "metadata" I mean keeping
track of what data is written where, but I guess I've been
misunderstanding you -- you want all of the writes to be written to the
master transaction log?
  
I appreciate the questions.  It is really helpful to think through these details.   In any case, I assume the reason for sharding is either that state is too large to fit in memory on a single machine or that updates to state are too computationally expensive to get done in real time.  I am however assuming that we are not throughput bound in logfile writes.   The web advertises SAN devices that handle 60,000 8KB I/O requests per second which translates to in excess of 500MB of data per second.  To give a flavor for that scale, Facebook has ~50m users per day.  Assume that they top out at 5m users per hour, 10k users per minute... So we can scale to all of them updating six times in the same second. 

There is metadata that takes the form of the directory described above.  But, since we have a central logfile, there is not reason that commands that split/merge/replicate shards can't also be part of the global application log.   A checkpoint would involve a particular shard configuration as well as the checkpoints for each of those shards.

I think this discussion would be a little clearer to me if the
implementation details were more concrete (could we have a constructive
proof please? :) )
  
Indeed!  And this discussion is super helpful in getting us there!

-Alex-

S. Alexander Jacobson

unread,
Jul 27, 2009, 7:46:29 PM7/27/09
to HA...@googlegroups.com

Sterling Clover

unread,
Jul 27, 2009, 9:31:26 PM7/27/09
to HA...@googlegroups.com
Relatedly, if anybody is interested in writing a generic library
implementing paxos[1] in Haskell, I'm sure that would find usage in
plenty of situations.

Cheers,
S.

[1] http://en.wikipedia.org/wiki/Paxos_algorithm#Multi-Paxos

Gregory Collins

unread,
Jul 28, 2009, 11:56:03 AM7/28/09
to HA...@googlegroups.com
"S. Alexander Jacobson" <al...@alexjacobson.com> writes:

> So let me see if I'm following you: for the purposes of this
> conversation so far we're talking *only* about using replication to
> ensure that each node in a cluster of happstack-state instances contains
> an identical replica of the same data? (Putting issues of sharding/data
> partitioning aside?)
>
> The issue of multi-master vs multi-slave is orthogonal to that of the
> transaction semantics of sharding; probably orthogonal to transaction
> semantics generally.

I'm not sure that's true, as we'll get to below re: Data.Map. Whether or
not you're sharding and how you're replicating has a huge impact on how
you have to design for transactional integrity (and vice versa).

Let's say you have a Data.Map in your data store, sharded across the
cluster. You have a request that comes in that updates the map:

l = map (\x -> (x,x)) $ take 100 $ iterate (+1) 1
m' = Map.union m $ Map.fromList l

and you want this to be committed as a transaction, all or nothing. That
update will be sharded across the cluster -- each node will get 1/Nth of
the keys. What happens if I receive a (more or less) simultaneous
request to compute the values for keys between (25,40) non-inclusive?
You'll have to map-reduce across the cluster to get this, which means
that each node needs to work relative to the same transaction log
sequence number across the entirety of the query.

I think that immediately establishes a requirement that each node has to
keep a buffer of the last N values (relative to the transaction log
number) because a distributed update might come in while you're trying
to satisfy a distributed query, and the data has to say consistent
w.r.t. transactions. So I wouldn't say exactly that replication scheme
is *orthogonal* to transaction semantics, a design decision in one will
have consequences for the other.


> What I don't understand here is how the data actually gets
> sharded/partitioned. Take for example "Map k v". If I understand the
> current system correctly (which is a long shot!), with a single node we
> just store this map in memory, and checkpoint the entire thing to disk
> periodically using the Serialize instance.
>
> So if you want to shard a Map across your cluster, how is this done?
>
> My default assumption is that you want to shard an ixset rather than a
> map, but, if you want to shard a map, the natural approach is probably
> to a hash on the key. You then have a directory that maps the first n
> bits of key to a particular shard (or set of shards depending on how
> we tackle adding/removing shards).
>
> You also need a filter for every update command that converts it to
> one command per shard. For example, if you had an insertList command,
> it would convert that into one insertList per shard with the elements
> in each list being only those eligible to be inserted into that shard.
> The map user does not see this directory. It is used only by the
> filter commands.

Well we would want to be able to shard any kind of data type that
conforms to some typeclass interface(s), the same way it is now. Right
now the bar is pretty low, if you can serialize & version a datatype
then you can store it in state.

Re: Map, once you splat the keys across the cluster, you'll need to
map-reduce to satisfy most queries (e.g. a range query). You'll also
have to write a special-purpose datatype wrapper around Map that is
aware of the sharding issues. Clearly the fantasy of being able to deal
with run-of-the-mill pure haskell values has to go out the window also,
you only get to deal with datatype operations that know what to do w.r.t
sharding.

I guess that the typeclass interface you need to write to will need to
specify all of these details. What would it look like for Data.Map? For
list? For Data.Set? For a directed graph? What's the minimal set of
operations you would need to shard an arbitrary datatype? Does such a
set exist?

In other words: my hypothesis is that every function operating on
datatypes in sharded state needs to expressible in terms of
map-reduce.


> Given that there is one global log master, I am not sure how
> meaningful it is to talk about per shard masters. Any slave can
> supply a checkpoint for its shard.

What does it mean to have a global log master? I think of a "transaction
log" is a series of commands {c_1,...,c_n} that transform a state s_1 to
a state s_n by

s_n = c_n(c_{n-1}(...(c_1(s_1))...)

Given that our commands are arbitrary haskell functions, how do we
"replay" a transaction log for node 14 in our cluster? How is the
transaction log represented on disk? I don't think that the way it's
done now in -State (the UpdateEvent stuff) is quite going to be enough.

> I think this discussion would be a little clearer to me if the
> implementation details were more concrete (could we have a constructive
> proof please? :) )
>
> Indeed! And this discussion is super helpful in getting us there!

Some pictures wouldn't hurt either, it's difficult to conceptualize the
design without them.

G.
--
Gregory Collins <gr...@gregorycollins.net>

Thomas DuBuisson

unread,
Jul 28, 2009, 12:47:42 PM7/28/09
to HA...@googlegroups.com
Sterling Clover:

>
> Relatedly, if anybody is interested in writing a generic library
> implementing paxos[1] in Haskell, I'm sure that would find usage in
> plenty of situations.

Funny you should mention that - I had recently read a little on Paxos
for another Haskell library.

For my needs the main issue is that Paxos (as described on Wikipedia)
doesn't offer a method to verify the results of the acceptors without
either n*m messages between the acceptors and learners or m messages
from the proposer to all the learners with n signatures (one from each
acceptor). I did the math and learned the second way would mean a
2000 - 3000 byte message, which I felt was excessive given the
frequency of the operation, and the first way is up to 225 messages
(don't get me started about the latency etc).

If I could convince myself that there is a decent verification scheme
then I might write Paxos myself.

Thomas

Daniel B Giffin

unread,
Jul 28, 2009, 5:45:07 PM7/28/09
to HA...@googlegroups.com
Sterling Clover wrote:
> Relatedly, if anybody is interested in writing a generic library
> implementing paxos[1] in Haskell, I'm sure that would find usage in
> plenty of situations.

a colleague of mine who does research in distributed systems
(and is thus familiar with the details of paxos) is trying
to do this as a sort of "hello world" experiment while
learning haskell.

a beginner myself, i'm trying to help out. so far, i've
used the FFI to get access to the ioctl() calls necessary to
discover the broadcast addresses of local network
interfaces.

if we can get it anywhere near production quality, i'll let
you all know.

daniel

S. Alexander Jacobson

unread,
Jul 28, 2009, 9:05:39 PM7/28/09
to HA...@googlegroups.com
I am not sure I understand the function you are describing here.   Can you give a more motivated example in terms of real life need?

To clarify, I am claiming that most any real-world behavior can be accomplished by a pair of functions of the following form:
> queryToAllShards shardState result::shardState->result
> perShardUpdate shardState-> result -> r2::[result]->shardState->shardState->r2
>
> shardUpdate result shardState r2::queryToAllShards shardState ressult->
>                                   perShardUpdate shardState result r2->
>                                   IO [r2]

The system distributes the query to all shards and collects the result.  Each shard then gets a copy of the global result which it can use to decide how to make local updates.

This model definitely handles typical sharded transaction problems  like reciprocal friend relations and bank balance transfers.  Can you give me a real life application need for which this model is incompatible?

The logging model for this is that a total order is put on these pairs by the master log.
My further claim is that optimistic concurrency (speculative execution of queries) allows this model to perform reasonably well because most intervening updates are unlikely to change any particular query result.  And if you are having performance problems, there are straightforward ways to rearchitect so you don't.

-Alex-

Gregory Collins

unread,
Jul 29, 2009, 11:59:11 AM7/29/09
to HA...@googlegroups.com
"S. Alexander Jacobson" <al...@alexjacobson.com> writes:

> I am not sure I understand the function you are describing here. Can
> you give a more motivated example in terms of real life need?

An example: "give me a list of the widgets recorded to state in the past
three days" -- a range query. Or "who are the top ten users this month?"

For the purposes of this discussion I've been focussing on "Map Int Int"
because if we can't come up with a scheme that works for that, more
complicated datatypes would be hopeless.


> To clarify, I am claiming that most any real-world behavior can be
> accomplished by a pair of functions of the following form:
>
>> queryToAllShards shardState result::shardState->result
>> perShardUpdate shardState-> result -> r2::[result]->shardState->shardState->r2
>>
>> shardUpdate result shardState r2::queryToAllShards shardState ressult->
>> perShardUpdate shardState result r2->
>> IO [r2]

The syntax is a little weird there, it's hard to see what you're getting
at. I think the requirements for a datatype to be "shardable" (besides
being serializable of course) are:

* a reduction function for query results -- you need to be able to send
the same query to every shard in the cluster and combine the
results. Which reduction function you use might depend on the query --
"top ten elements" would need a different reducer than "widgets
created yesterday".

* you need to be able to record a description of updates on the datatype
to a transaction log in an efficient binary serialization format.

* you need a partition function that, given a description of the nodes
in the cluster and a description of a data update, will give you a
list of data updates to be applied to each shard.


> The system distributes the query to all shards and collects the
> result. Each shard then gets a copy of the global result which it can
> use to decide how to make local updates.

In your thinking, what are "query" and "result" and how are they
represented over the wire? What's a "global result"?


> This model definitely handles typical sharded transaction problems
> like reciprocal friend relations and bank balance transfers. Can you
> give me a real life application need for which this model is
> incompatible?

Without more concrete details about exactly what is going over the wire,
and in what order, I think it's impossible for me to make an assessment.


> The logging model for this is that a total order is put on these pairs
> by the master log. My further claim is that optimistic concurrency
> (speculative execution of queries) allows this model to perform
> reasonably well because most intervening updates are unlikely to
> change any particular query result. And if you are having performance
> problems, there are straightforward ways to rearchitect so you don't.

I still don't have a good handle on your proposed scheme. Let's shelve
discussions about complicated things like speculative updates until we
can agree on what the basics might look like. Some diagrams would really
help. Some questions for which I don't feel I know an answer:

* what are the requirements on a datatype in order for it to be
shardable? (i.e. which type classes must it implement)

* how are transactions handled? Given that data is partitioned into
(presumably) disjoint sets, how do you ensure that a query or update
against the cluster is applied to a consistent version of data?
Given that each node may have a different idea of what the "most
recent transaction" is (if an update's been replicated to node 3 but
not node 5, for instance), how do you maintain the "latest
transaction id" that is consistent across the cluster -- without
forcing all queries or updates through a single machine?

* what would a network diagram look like for a key lookup on "Map Int
Int"? For a range query? For a bulk item update? What's sent over
the wire for each of these (both data & transaction information)?

* what exactly is put to disk, and when? How do you ensure data
consistency across the cluster in the context of shutdown/restart?
Will you be able to support datasets larger than memory, and if so,
how?

* how do the sharding functions change when you add/remove nodes? How
much data do you have to migrate if you add/remove a node, and how
do you migrate it?

S. Alexander Jacobson

unread,
Jul 29, 2009, 10:23:53 PM7/29/09
to HA...@googlegroups.com
Before we discuss ACID HAppS sharding, it seems useful to recapitulate how basic single server ACID HAppS works:
  • Atomic update functions are defined by the user.  $mkMethods converts these functions into serializable commands.
  • Consistency is achieved by relying on the type system and reliance on the state monad.
  • Isolation happens because the system consolidates all updates into a single thread (putting a total order on updates)
  • Durability is achieved by logging commands to disk before applying them to state.  Checkpoints are made periodically so as to avoid having to replay logs from the beginning of time.
The problem with single server HAppS is the risk of downtime in the event of machine malfunction, the downtime from having to reboot on code changes, and the risk that query volume will overrun the capacity of a single machine to handle.  The solution is multi-server unsharded HAppS.

Multi-server unsharded HAppS will modify the above so that slaves deliver update commands to a log master.  The log master then makes these commands durable before distributing them to the slaves maintaining state.  The slaves are always waiting for the next sequenced update to arrive from the log master.  Slaves handle queries against whatever version of state they happen to be holding.  (Multimaster actually was this same model except that the "master" was an emergent property of the cluster).

Although an improvement, multi-server unsharded leaves us with two other scaling risks:
  • state gets too large to fit in the memory of a single machine.
  • too many updates for any of the machines to handle
The solution is to divide up state into shards so that no individual machine needs to hold the entire state itself and so
different updates are handled by different shards.  The second constraint is somewhat blurry because it depends both on the workload of updating and on the workload of deciding which shards need to update.  If we give enough information for shards to decide whether a log entry is relevant enough to process and assume that filtering log entries on that basis is lightweight, then we need not  abandon the concept of a single master update log. 

Let's pressupose that shardable state is always a collection of objects of some type.   Further we assume that these objects are indexed on some key.   So lets sketch:
> class (Ord k,Eq k) => ObjectId o k where
>    objectId o = k
> 
> class (ObjectId o k,Component (c o)) => Shardable (c o) where
>    empty :: c o
>    divide :: c o -> NumShards -> Index -> c o -- become the Index shard out of numshards after split
>    merge :: [c o] -> c o
>    range :: c o -> (k,k) 
>
> class (ObjectId o k,Bounded k,Shardable (c o)) => ShardUpdateCommand u (c o) where
>    getRanges u ->  [(k,k)]
>    getRanges u = [(minBound,maxBound)] -- default is global broadcast
>    -- Note: the semantics of this need cleanup but I hope the idea makes sense

So the difference between unsharded update and shardedupdate is that a sharded update can designate the ranged of shards to which it should be delivered, whereas an unsharded update is delivered to all of them.  For example, a command to insert an object should be delivered to only to the shards that encompass that objectId.

If every ShardUpdateCommand is delivered to every shard in a consistent order with a sequence number, then we should have a consistent view of state regardless of what shard configuration we happen to have.  The shards that don't care about an update will ignore it.  The shards that do will apply it.  If we are unsharded then all updates are within the range of global state.
> type LogEntry updateCommand = (SequenceNumber, shardUpdateCommand)
The sequenceNumber guarantees that global state depends only on the log entries and not on the network.  So now all queries are with respect to state as of a given SequenceNumber:
> class (Shardable (c o)) => Query qCommand (c o) r v where
>    qMap::(qCommand, SequenceNumber) -> c o -> r
>    qReduce::[r] -> v

The mechanics of a global query are:
  1. Look up the current sequence number
  2. Distribute the serialized query command to all shards packaged with the serial number
  3. Collect the results back into a list
  4. Reduce the result list to a value
Since a shard can always follow the log until up to any sequenceNumber, it can always run tha qMap function only as of that sequence number.  The result is a consistent view of global state as long as all shards are reachable.

So, if you want the top 10 scoring users this month, you create a qMap function that gets the top 10 users for each shard and then a reduce function that finds the top 10 among the results.  If you want the total number of users, you create a qMap that gets the size of each shard and let your reduce function be (sum).

Restructuring Shards And Checkpointing

Every shard checkpoints so if you need to split up a shard you can create a new set of servers that recover from that shards last checkpoint, divide, and then process the log relevant to them.  If you need to merge, you can bring on a machine that pulls all the component shards, merges, and then updates from the aggregate.

The key point is that the overall application may have overlapping shard checkpoints associated with splitting and merging.  And that is completely ok.

Recovering Global ACID

Now we need to recapture full ACID properties for global state.  For example, we may want to cap the total number of objects of a particular type in the database and these objects are spread across all shards.  We might want to do a balance transfer and want to make sure the debited account has enough to support the transfer.  The two accounts may be on different shards.

Because queries are a function of sequence number, we can solve this problem as follows:

We define an update/insert function that is dependent on the value of a global query.  Since every insert/update has a particular sequence number, we can get atomicity/consistency  by always delivering to it the value of the query as of the prior sequence number.  This is the key idea of this whole email.  So if this paragraph doesn't make sense, read it again.

Formally, this looks something like:
> class (Shardable (c o),Query qCommand (c o) r v,ShardedUpdateCommand uCommand (c o)) 
>    => Transaction qCommand uCommand where
>    tQuery::qCommand -> SequenceNumber -> v -- do the query as of a sequence number
>    tUpdate::v -> uCommand -- convert the query value to an update command 
>    transact::uCommand -> c o -> c o
>
So now we want to redefine our transaction log entry as:
> type TLogEntry = (Transaction qCommand uCommand) => (SequenceNumber, (qCommand,uCommand))

Re-caping the big ideas:
  • Atomicity: A transaction is a query plus a set of inserts and an update.
  • Consistency: The type system, the state monad
  • Isolation: Transactions are run in order and not in parallel
  • Durability: Transactions are logged and all shards checkpoint their state.
The major point to observe here is that while we are solving the memory problem completely, we are solving the throughput problem only partially.  Every slave sees the full update log.  We assume they call can decide what to ignore fast enough for the application.  If that turns out not to be the case, then you get stuck in the muck of other semi-consistent transaction systems. 

The reason to be optimistic is that range lookups something computers are really fast at.  Facebook has 250m users.  Assume that 25m users per hour in peak hours, that comes to only 7000 updates per second.  There is no reason a modern CPU can't do 7000 range checks per second.  The other problem is the network load of distributing 7000 updates per second to e.g. 10k 10GB servers (100TB).  But there are lots of network broadcast protocols.  And since each slave is supposed to get a copy obtaining missing pieces from neighboring slaves should be relatively lightweight (though beyond the scope of this document).

-Alex-


Reply all
Reply to author
Forward
0 new messages