I like this approach. It should be much simpler to implement than
multi-master. However, the latency involved with assigning a new
master is rather steep and could almost be described as a single point
of failure. Since upgrading the master is basically a controlled
crash, this is a sore point that would be felt often.
> == Safe Zero Downtime Cluster Migration ==
>
> Upgrade slaves one by one. If they can get to current state, run user
> defined tests, and handle query volume without barfing, keep updating.
> Update the master last.
>
> That being said, it would be nice to have faster detection of migration
> errors. The system should be modified to write a hash of the type
> structure of state and logs at the head of state and log files and check
> that this hash matches at load time. Bonus points for TH that (at compile
> time) writes the hashes to a file in the source hierarchy so you can find
> the code version that matches this particular saved state later.
The migration system limits the usefulness of a simple hash, but I'm
sure we can find a satisfactory solution.
I'm having a hard time grokking this. The number of shards isn't
determined by the number of nodes in the cluster?
Here's what I understand so far:
* A 'cluster' is a collection of 'nodes'. Each 'node' is capable of
computations.
* A 'component' is the concept of a Haskell data-structure. It may
exists in fragments across the cluster although this wouldn't be
directly visible to the programmer.
* A 'shard' is a subset or slice of a 'component'. Shards are
"physical" manifestations of components and should be available (for
queries) on at least three nodes.
** I'm not sure how many replicas are needed. We do not need them to
prevent data loss anymore, but too few of them could lead to
interruptions of service.
* There are two distinct designations: Shard master and shard member.
Both designations are parameterized by shard number and component id.
* A 'node' can have multiple designations.
* A shard master must also be a shard member for the same shard and component.
* For each shard in every component, one (and only one) node must be
designated as 'shard master'.
* Computational obligations are determined by designations:
** For shard members: Apply queries locally, send updates to the
shard master, and apply updates from the shard master locally.
** For shard masters: Receive updates from shard members, apply
updates locally, and stream update events to the shard members after
they've been applied.
Since each shard master has to maintain an event log and checkpoint
file, each of them need a separate block store. Managing that many
filesystems would be cumbersome.
--
Cheers,
Lemmih
On Fri, Jul 24, 2009 at 6:47 PM, S. Alexander jacobson<al...@alexjacobson.com> wrote:I've been thinking through what it takes to get happs to industrial strength on reasonable network architectures. The original multi-master idea was based on the assumption that reliable network attached storage was expensive/hard; that it was in some sense cheaper to a local disk for each instance. Network attached storage has gotten a lot cheaper/easier over the last four years. Amazon Elastic Block Store makes it practically effortless for EC2 instances. Given the added complexity of finishing implementation and and proving multi-master reliability, it may make sense now to switch to master-slave architecture (which also allows us to eliminate the Spread dependency).
Why would the latency be high? If the election period is e.g. .5 seconds, then it takes 1 second to discover that a majority can't see the master and, if the network is not segmented, for a slave that is already up-to-date, simply to start handling updates itself.I like this approach. It should be much simpler to implement than multi-master. However, the latency involved with assigning a new master is rather steep and could almost be described as a single point of failure. Since upgrading the master is basically a controlled crash, this is a sore point that would be felt often.
== Safe Zero Downtime Cluster Migration == Upgrade slaves one by one. If they can get to current state, run user defined tests, and handle query volume without barfing, keep updating. Update the master last. That being said, it would be nice to have faster detection of migration errors. The system should be modified to write a hash of the type structure of state and logs at the head of state and log files and check that this hash matches at load time. Bonus points for TH that (at compile time) writes the hashes to a file in the source hierarchy so you can find the code version that matches this particular saved state later.The migration system limits the usefulness of a simple hash, but I'm sure we can find a satisfactory solution.
== Easy Amazon EC2 Provisioning Monad for Single Master operation == Most Haskell developers do not have easy access to multi-server data centers on which to develop. However, Amazon, Rackspace and others now provide provisioning APIs, network load balancing, server monitoring, network storage in relatively easy to use packages. It makes sense for HAppS to target these data center APIs. I imagine a HAppS datacenter monad with the following functions:type ResultInfo = (Either Error Success) newState::DC StateId -- creates state and log network storage volumes for the app (elastic block stores on Amazon) copyState::StateId -> DC StateId -- useful for testing delState::StateId -> DC ResultInfo -- useful when done testing upState::DomainName -> AppCode -> StateId -> DC ResultInfo -- start/upgrade app on state from a given domain name downState::StateId -> EC2 (Either Error Success) -- shutdown running clusterNote that Amazon provides auto-scaling so you don't have to define a number of servers required for your app. It also provided ElasticLoadBalancing so at this stage we don't need to write that code into HAppS.
* A 'component' is the concept of a Haskell data-structure. It may exists in fragments across the cluster although this wouldn't be directly visible to the programmer.
* A 'shard' is a subset or slice of a 'component'. Shards are "physical" manifestations of components and should be available (for queries) on at least three nodes.
** I'm not sure how many replicas are needed. We do not need them to prevent data loss anymore, but too few of them could lead to interruptions of service.
* There are two distinct designations: Shard master and shard member. Both designations are parameterized by shard number and component id.
* A 'node' can have multiple designations.
* A shard master must also be a shard member for the same shard and component.
* For each shard in every component, one (and only one) node must be designated as 'shard master'.
* Computational obligations are determined by designations: ** For shard members: Apply queries locally, send updates to the shard master, and apply updates from the shard master locally. ** For shard masters: Receive updates from shard members, apply updates locally, and stream update events to the shard members after they've been applied.
Since each shard master has to maintain an event log and checkpoint file, each of them need a separate block store. Managing that many filesystems would be cumbersome.
To be slightly more precise, the new master has to know exactly which
update event was the last to be written to the log file.
> Is there other latency that I am missing?
I feared (and still do) that the new master would have to replay the event log.
I thought the idea of the master-slave approach would be to sidestep
sequencing issues by letting the master decide on the order to apply
updates. If each node has to determine the order of events
independently then we're right back at the multi-master approach.
In other words, we can't rely on the master for event ordering since
it cannot write a log entry and broadcast an event in a single atomic
operation.
> == Safe Zero Downtime Cluster Migration ==
>
> Upgrade slaves one by one. If they can get to current state, run user
> defined tests, and handle query volume without barfing, keep updating.
> Update the master last.
>
> That being said, it would be nice to have faster detection of migration
> errors. The system should be modified to write a hash of the type
> structure of state and logs at the head of state and log files and check
> that this hash matches at load time. Bonus points for TH that (at compile
> time) writes the hashes to a file in the source hierarchy so you can find
> the code version that matches this particular saved state later.
>
>
> The migration system limits the usefulness of a simple hash, but I'm
> sure we can find a satisfactory solution.
>
>
> It seems like we can use Data.Data and Data.Typeable to give each full
> datatype a uinique hash....?
Sure, but a simple hash may not be that useful. Say you change some
sub-type in your data structure from 'String' to the newtype 'Name'.
Now the hash will change but you'd still be able to parse your old
data if you defined a migration from 'String' to 'Name'.
Just that masters also function as members for their respective
shards. That is, a node can't be a master if the shard isn't available
locally.
> * For each shard in every component, one (and only one) node must be
> designated as 'shard master'.
>
>
> yes.
>
> * Computational obligations are determined by designations:
> ** For shard members: Apply queries locally, send updates to the
> shard master, and apply updates from the shard master locally.
> ** For shard masters: Receive updates from shard members, apply
> updates locally, and stream update events to the shard members after
> they've been applied.
>
>
>
> Also, for non-members of the shard, distribute queries/updates to all
> shards. But get/put/post/delete only to the shard with the relevant
> objectId.
>
> My instinct is not to require coherence at this time so if the app is
> concerned that not all shards have received an update query, it can ask
> again or handle at the application level in some way.
>
> That being said, it would be nice if the system provided some way to
> maximize the likelihood of full distribution of queries and updates so we
> are likely to want to use spread in some way here.
>
> Since each shard master has to maintain an event log and checkpoint
> file, each of them need a separate block store. Managing that many
> filesystems would be cumbersome.
>
>
>
> Among the reasons that I like targeting Amazon Web Services is that their
> Elastic Block Store makes this really easy. The have an API that provisions
> new block stores in real time. See <http://aws.amazon.com/ebs/>
Oh, excellent.
--
Cheers,
Lemmih
Summary from chat with Alex:
Replaying events from the log /is/ necessary but not a big problem
since the number of new events is likely to be small. That is, when
the old master does down, the new master reads the event log backwards
to find events that were recorded but not sent to the slaves.
So, let's recap:
* A single master is responsible for the order of update events. The
sequenced events are saved to the log file and then broadcast to the
slaves.
* Slaves try to stay current by applying the event stream from the master.
* If/When the master dies, a new master is elected.
* The new master replays the events log from the highest common event
ID that each slave received. This closes the gap between saving events
on disk and sharing them with the slaves.
--
Cheers,
Lemmih
> Replaying events from the log /is/ necessary but not a big problem
> since the number of new events is likely to be small. That is, when
> the old master does down, the new master reads the event log backwards
> to find events that were recorded but not sent to the slaves.
>
> So, let's recap:
> * A single master is responsible for the order of update events. The
> sequenced events are saved to the log file and then broadcast to the
> slaves.
> * Slaves try to stay current by applying the event stream from the master.
> * If/When the master dies, a new master is elected.
> * The new master replays the events log from the highest common event
> ID that each slave received. This closes the gap between saving events
> on disk and sharing them with the slaves.
Am I right in concluding that you mean that writes are only permitted on
the master (a la MySQL)?
G
--
Gregory Collins <gr...@gregorycollins.net>
A master is determined to be dead by election. The order of succession
is predefined for a cluster.
The election process is that every 500ms, each server broadcasts the
list of servers from which it received a broadcast in the prior 500ms as
well as its current highest log sequence number. The master is declared
dead if the majority of servers didn't receive a broadcast from it in
the last period.
When the top ranked slave takes over as master, it plays back the log
from the lowest sequence number recieved in the last broadcast.
However, a slave may request missing log entries if it was also down
during the last broadcast period.
-Alex-
> I am not a current user of MySQL but my understanding is that it
> requires the client to know which server is the master and send
> updates to one server and queries to the others.
Right.
> In the design we are discussing here, the framework takes care of that
> routing and the app is protected from knowing the identity of the
> master rather than the slave. All nodes look the same from the user
> perspective. Writes are permitted on any node. This is just a
> question of how the system does routing internally.
But there still is a single master which is responsible for maintaining
an authoritative copy of the update log (albeit "behind the curtain")?
> To be clear, as long as the routing is kept hidden from the user, the
> reason to choose multi-master versus master-slave has much more to do
> with expectations about the availability of reliable network attached
> storage rather than any performance issues. Master-slave should
> actually have slightly better performance characteristics because it
> does not have to wait on an ACK from all the other servers. The
> reason we designed for multi-master originally was that network
> attached storage was complex and expensive and so we preferred to
> achieve durability by having each server write a copy of the
> checkpoint and log to its own disk. With the advent of Amazon Elastic
> Block Store and similar services, the balance has shifted so now
> Master-Slave seems like the preferred architecture internally.
OK. The reason I ask is that forcing writes to (ultimately) be shunted
through a single machine might result in a bottleneck under certain
conditions (jobs with very heavy write loads, for instance). My thinking
is that the desirability of multi-master is not necessarily a matter of
the reliability of network-attached storage, but rather that it allows
you to handle a write load heavier than a single machine can handle.
That alternative is not 100% appealing either though, you then introduce
other difficult issues -- you need something like consistent hashing to
identify where a particular data item resides in the cluster, write
conflicts necessitate a read-repair scheme (i.e. vector clocks), without
a quorum protocol you lose ACID in lieu of "eventual consistency", etc.
> Master-slave does not substantially change bottleneck properties. No
> matter what, there is a total order on all updates to state (or later
> to a shard of state).
Is that really true? It isn't obvious to me (but I'm pretty dense!)
Obviously if you presuppose ACID semantics then this has to be true. Of
course, in a distributed context maintaining the ACID semantics implies
one of a couple of conclusions (and apologies if this isn't an
exhaustive list, I'm not a domain expert here):
* you have a single master which maintains the authoritative update
log and is responsible for the total order you mentioned. Even if
the data itself is distributed/sharded across nodes in the cluster,
this implies a single choke point for updates to a metadata
repository. Here you spend less time in communication between nodes
but your write throughput is limited to how quickly the master can
perform metadata updates. If the master dies you have deep problems
to solve re: election of a new master.
* you allow writes to any node in the cluster, and the nodes use some
kind of consensus/quorum protocol (like Paxos) to enforce total
ordering in the metadata. Here you might be able to distribute write
load more evenly but Paxos has a lot of communication overhead
(which is why e.g. Google uses it so sparingly)
Another choice is to throw ACID out the window -- this allows you to
make great gains in write throughput, with the caveat that simultaneous
updates to the same data item can cause conflicts that need to be
detected and repaired somehow. You can detect conflicts using vector
clocks
(http://research.microsoft.com/users/lamport/pubs/time-clocks.pdf) but
then you have to fix the conflict somehow, and the right conflict
resolution policy depends on the data domain (i.e. for some problems
maybe you can merge, for some you might have to revert to an earlier
state and force the user to make a choice, etc.)
BTW: let me know if I'm blathering on about stuff everyone knows already
:)
> The only change is that you don't have to wait for other members to
> ACK before considering an update official.
>
> Thinking about transaction semantics for sharding, here are my
> thoughts:
>
> 1. Updates to sharded components will be paired with an optional
> query command.
> 2. The updateCommand gets written to a global update log for all
> shards.
> 3. The optional query is executed against all shards as of the
> sequence number of the corresponding updateCommand.
> 4. The queryResults are then distributed to each shard along with the
> updateCommand.
> 5. Each shard executes the updateCommand locally.
>
> The good news about this is that we retain ACID semantics across
> state. For example, the query can verify that two users still exist
> before adding each to the other's friend list. The query can verify
> that the from bank account has money before initiating a
> transfer. etc.
Maintaining ACID semantics here is very very tricky and there are many
pitfalls to deal with, I know I feel too stupid to tackle it at least!
> The theoretical bad news is that worst case performance when you need
> full ACID semantics, is throughput limited to the network latency of
> this system. But this worst case performance assumes that every
> update steps on every other update. In practice, most of the time
> they don't (or else you would design the app differently for other
> reasons). So in practice, a few optimizations get you back to high
> throughput.
>
> 1. Most updates do not have global dependency and therefore don't
> carry a query so don't impose this penalty
>
> 2. The system can do speculative query execution and rollback updates
> only when query values actually change (assumption is that query
> execution time is much less than network latency).
>
> 3. The programmer can provide functions that allow the system to know
> that what sort of prior updates don't interfere. For example, if no
> withdrawal or transfers occurred involving the relevant accounts
> since the speculative query was executed, the query results can be
> deemed good. Note, I am skeptical that this actually helps much.
> CPU power is sufficiently cheap that double query execution is a
> worthwhile trade-off to programmer effort providing these hints.
To be honest I'm not quite sure I'm following you here -- this is a
pretty abstract discussion and I don't feel comfortable that I'm smart
enough to stay "tethered to reality" here (if that makes sense). I can't
say for sure, but my "sense-feeling" tells me that determining whether
one update will conflict with another in the general case is probably
NP-complete (please prove me wrong!)
> Does this make sense? Do a distributed query that establishes the
> preconditions for local updates to each shard with speculative
> query/update execution to maintain throughput against network latency?
Once you go into this speculative update territory you have to introduce
dirty (and difficult) details like rollback/read repair/etc. Your
likelihood for introducing data inconsistency/corruption also goes
through the roof -- there's a reason why we don't have multimaster MySQL
servers, for instance!
--
Need somewhere to put your code? http://patch-tag.com
Want to build a webapp? http://happstack.com
"S. Alexander Jacobson" <al...@alexjacobson.com> writes:Master-slave does not substantially change bottleneck properties. No matter what, there is a total order on all updates to state (or later to a shard of state).Is that really true? It isn't obvious to me (but I'm pretty dense!)
Obviously if you presuppose ACID semantics then this has to be true. Of course, in a distributed context maintaining the ACID semantics implies one of a couple of conclusions (and apologies if this isn't an exhaustive list, I'm not a domain expert here):
* you have a single master which maintains the authoritative update log and is responsible for the total order you mentioned. Even if the data itself is distributed/sharded across nodes in the cluster, this implies a single choke point for updates to a metadata repository. Here you spend less time in communication between nodes but your write throughput is limited to how quickly the master can perform metadata updates. If the master dies you have deep problems to solve re: election of a new master.
* you allow writes to any node in the cluster, and the nodes use some kind of consensus/quorum protocol (like Paxos) to enforce total ordering in the metadata. Here you might be able to distribute write load more evenly but Paxos has a lot of communication overhead (which is why e.g. Google uses it so sparingly)
Does this make sense? Do a distributed query that establishes the preconditions for local updates to each shard with speculative query/update execution to maintain throughput against network latency?Once you go into this speculative update territory you have to introduce dirty (and difficult) details like rollback/read repair/etc. Your likelihood for introducing data inconsistency/corruption also goes through the roof -- there's a reason why we don't have multimaster MySQL servers, for instance! G
== S3 vs EC2 (for Matther Elder) ==
We picked the multimaster architecture originally because reliable network attached storage was expensive and complex. When Amazon introduced EC2, the only durable storage was its Simple Storage Service (aka S3). S3 was an asynchronous mechanism so you had no easy way to know when your write had been made durable so we stuck with multimaster as the best architecture.
Now that Amazon has introduced Elastic Block Store, a synchronous file store that is easily remountable by any EC2 server, it seems preferable to focus on multi-slave and let EBS take care of durability.
> == Multi-master vs Multi-Slave ==
>
> Current state is whatever total order on updates has been made
> durable. With multimaster, durability is achieved by having multiple
> masters confirm writes to their local disks. With master-slave,
> durability happens only on the master's disk and we trust that:
So let me see if I'm following you: for the purposes of this
conversation so far we're talking *only* about using replication to
ensure that each node in a cluster of happstack-state instances contains
an identical replica of the same data? (Putting issues of sharding/data
partitioning aside?)
> == ACID Sharding ==
>
> Let's go through the letters.
>
> Atomicity: With sharding, an atomic transaction is a global query
> paired with a per shard update. Unlike with SQL, a single global
> query can do a wide variety of lookups and calculations as can our
> update -- because both are full-on haskell functions. As a result we
> don't need to interleave lots of different interactions and engage in
> the complex concurrency management forced on MySQL, Oracle, etc.
What I don't understand here is how the data actually gets
sharded/partitioned. Take for example "Map k v". If I understand the
current system correctly (which is a long shot!), with a single node we
just store this map in memory, and checkpoint the entire thing to disk
periodically using the Serialize instance.
So if you want to shard a Map across your cluster, how is this done?
> Consistency: With sharding, there is still a single log master for all
> shards. Haskell purity guarantees consistency in the event of a log
> replay. I am not worried at this point that log write throughput is
> an issue.
So each "shard" could consist of e.g. a master and zero or more slave
replicas (keeping with your statement that replication is orthogonal to
sharding)? E.g. twelve machines could be configured as such?:
M M M M
S S S S
S S S S
shard 1 shard 2 shard 3 shard 4
> * you have a single master which maintains the authoritative
> update log and is responsible for the total order you
> mentioned. Even if the data itself is distributed/sharded across
> nodes in the cluster, this implies a single choke point for
> updates to a metadata repository. Here you spend less time in
> communication between nodes but your write throughput is limited
> to how quickly the master can perform metadata updates. If the
> master dies you have deep problems to solve re: election of a
> new master.
>
> What do you mean by metadata updates here?
That's a good question. The proposed design we're talking about isn't
clear to me at all, so please endure my pedantic questions. If you have
N shards in your cluster, you probably want to distribute 1/Nth of the
write load to each shard in the cluster. By "metadata" I mean keeping
track of what data is written where, but I guess I've been
misunderstanding you -- you want all of the writes to be written to the
master transaction log?
I think this discussion would be a little clearer to me if the
implementation details were more concrete (could we have a constructive
proof please? :) )
"S. Alexander Jacobson" <al...@alexjacobson.com> writes:== Multi-master vs Multi-Slave == Current state is whatever total order on updates has been made durable. With multimaster, durability is achieved by having multiple masters confirm writes to their local disks. With master-slave, durability happens only on the master's disk and we trust that:So let me see if I'm following you: for the purposes of this conversation so far we're talking *only* about using replication to ensure that each node in a cluster of happstack-state instances contains an identical replica of the same data? (Putting issues of sharding/data partitioning aside?)
== ACID Sharding == Let's go through the letters. Atomicity: With sharding, an atomic transaction is a global query paired with a per shard update. Unlike with SQL, a single global query can do a wide variety of lookups and calculations as can our update -- because both are full-on haskell functions. As a result we don't need to interleave lots of different interactions and engage in the complex concurrency management forced on MySQL, Oracle, etc.What I don't understand here is how the data actually gets sharded/partitioned. Take for example "Map k v". If I understand the current system correctly (which is a long shot!), with a single node we just store this map in memory, and checkpoint the entire thing to disk periodically using the Serialize instance. So if you want to shard a Map across your cluster, how is this done?
Consistency: With sharding, there is still a single log master for all shards. Haskell purity guarantees consistency in the event of a log replay. I am not worried at this point that log write throughput is an issue.So each "shard" could consist of e.g. a master and zero or more slave replicas (keeping with your statement that replication is orthogonal to sharding)? E.g. twelve machines could be configured as such?: M M M M S S S S S S S S shard 1 shard 2 shard 3 shard 4
* you have a single master which maintains the authoritative update log and is responsible for the total order you mentioned. Even if the data itself is distributed/sharded across nodes in the cluster, this implies a single choke point for updates to a metadata repository. Here you spend less time in communication between nodes but your write throughput is limited to how quickly the master can perform metadata updates. If the master dies you have deep problems to solve re: election of a new master. What do you mean by metadata updates here?That's a good question. The proposed design we're talking about isn't clear to me at all, so please endure my pedantic questions. If you have N shards in your cluster, you probably want to distribute 1/Nth of the write load to each shard in the cluster. By "metadata" I mean keeping track of what data is written where, but I guess I've been misunderstanding you -- you want all of the writes to be written to the master transaction log?
I think this discussion would be a little clearer to me if the implementation details were more concrete (could we have a constructive proof please? :) )
> So let me see if I'm following you: for the purposes of this
> conversation so far we're talking *only* about using replication to
> ensure that each node in a cluster of happstack-state instances contains
> an identical replica of the same data? (Putting issues of sharding/data
> partitioning aside?)
>
> The issue of multi-master vs multi-slave is orthogonal to that of the
> transaction semantics of sharding; probably orthogonal to transaction
> semantics generally.
I'm not sure that's true, as we'll get to below re: Data.Map. Whether or
not you're sharding and how you're replicating has a huge impact on how
you have to design for transactional integrity (and vice versa).
Let's say you have a Data.Map in your data store, sharded across the
cluster. You have a request that comes in that updates the map:
l = map (\x -> (x,x)) $ take 100 $ iterate (+1) 1
m' = Map.union m $ Map.fromList l
and you want this to be committed as a transaction, all or nothing. That
update will be sharded across the cluster -- each node will get 1/Nth of
the keys. What happens if I receive a (more or less) simultaneous
request to compute the values for keys between (25,40) non-inclusive?
You'll have to map-reduce across the cluster to get this, which means
that each node needs to work relative to the same transaction log
sequence number across the entirety of the query.
I think that immediately establishes a requirement that each node has to
keep a buffer of the last N values (relative to the transaction log
number) because a distributed update might come in while you're trying
to satisfy a distributed query, and the data has to say consistent
w.r.t. transactions. So I wouldn't say exactly that replication scheme
is *orthogonal* to transaction semantics, a design decision in one will
have consequences for the other.
> What I don't understand here is how the data actually gets
> sharded/partitioned. Take for example "Map k v". If I understand the
> current system correctly (which is a long shot!), with a single node we
> just store this map in memory, and checkpoint the entire thing to disk
> periodically using the Serialize instance.
>
> So if you want to shard a Map across your cluster, how is this done?
>
> My default assumption is that you want to shard an ixset rather than a
> map, but, if you want to shard a map, the natural approach is probably
> to a hash on the key. You then have a directory that maps the first n
> bits of key to a particular shard (or set of shards depending on how
> we tackle adding/removing shards).
>
> You also need a filter for every update command that converts it to
> one command per shard. For example, if you had an insertList command,
> it would convert that into one insertList per shard with the elements
> in each list being only those eligible to be inserted into that shard.
> The map user does not see this directory. It is used only by the
> filter commands.
Well we would want to be able to shard any kind of data type that
conforms to some typeclass interface(s), the same way it is now. Right
now the bar is pretty low, if you can serialize & version a datatype
then you can store it in state.
Re: Map, once you splat the keys across the cluster, you'll need to
map-reduce to satisfy most queries (e.g. a range query). You'll also
have to write a special-purpose datatype wrapper around Map that is
aware of the sharding issues. Clearly the fantasy of being able to deal
with run-of-the-mill pure haskell values has to go out the window also,
you only get to deal with datatype operations that know what to do w.r.t
sharding.
I guess that the typeclass interface you need to write to will need to
specify all of these details. What would it look like for Data.Map? For
list? For Data.Set? For a directed graph? What's the minimal set of
operations you would need to shard an arbitrary datatype? Does such a
set exist?
In other words: my hypothesis is that every function operating on
datatypes in sharded state needs to expressible in terms of
map-reduce.
> Given that there is one global log master, I am not sure how
> meaningful it is to talk about per shard masters. Any slave can
> supply a checkpoint for its shard.
What does it mean to have a global log master? I think of a "transaction
log" is a series of commands {c_1,...,c_n} that transform a state s_1 to
a state s_n by
s_n = c_n(c_{n-1}(...(c_1(s_1))...)
Given that our commands are arbitrary haskell functions, how do we
"replay" a transaction log for node 14 in our cluster? How is the
transaction log represented on disk? I don't think that the way it's
done now in -State (the UpdateEvent stuff) is quite going to be enough.
> I think this discussion would be a little clearer to me if the
> implementation details were more concrete (could we have a constructive
> proof please? :) )
>
> Indeed! And this discussion is super helpful in getting us there!
Some pictures wouldn't hurt either, it's difficult to conceptualize the
design without them.
G.
--
Gregory Collins <gr...@gregorycollins.net>
Funny you should mention that - I had recently read a little on Paxos
for another Haskell library.
For my needs the main issue is that Paxos (as described on Wikipedia)
doesn't offer a method to verify the results of the acceptors without
either n*m messages between the acceptors and learners or m messages
from the proposer to all the learners with n signatures (one from each
acceptor). I did the math and learned the second way would mean a
2000 - 3000 byte message, which I felt was excessive given the
frequency of the operation, and the first way is up to 225 messages
(don't get me started about the latency etc).
If I could convince myself that there is a decent verification scheme
then I might write Paxos myself.
Thomas
a colleague of mine who does research in distributed systems
(and is thus familiar with the details of paxos) is trying
to do this as a sort of "hello world" experiment while
learning haskell.
a beginner myself, i'm trying to help out. so far, i've
used the FFI to get access to the ioctl() calls necessary to
discover the broadcast addresses of local network
interfaces.
if we can get it anywhere near production quality, i'll let
you all know.
daniel
> queryToAllShards shardState result::shardState->result > perShardUpdate shardState-> result -> r2::[result]->shardState->shardState->r2 > > shardUpdate result shardState r2::queryToAllShards shardState ressult-> > perShardUpdate shardState result r2-> > IO [r2]The system distributes the query to all shards and collects the result. Each shard then gets a copy of the global result which it can use to decide how to make local updates.
> I am not sure I understand the function you are describing here. Can
> you give a more motivated example in terms of real life need?
An example: "give me a list of the widgets recorded to state in the past
three days" -- a range query. Or "who are the top ten users this month?"
For the purposes of this discussion I've been focussing on "Map Int Int"
because if we can't come up with a scheme that works for that, more
complicated datatypes would be hopeless.
> To clarify, I am claiming that most any real-world behavior can be
> accomplished by a pair of functions of the following form:
>
>> queryToAllShards shardState result::shardState->result
>> perShardUpdate shardState-> result -> r2::[result]->shardState->shardState->r2
>>
>> shardUpdate result shardState r2::queryToAllShards shardState ressult->
>> perShardUpdate shardState result r2->
>> IO [r2]
The syntax is a little weird there, it's hard to see what you're getting
at. I think the requirements for a datatype to be "shardable" (besides
being serializable of course) are:
* a reduction function for query results -- you need to be able to send
the same query to every shard in the cluster and combine the
results. Which reduction function you use might depend on the query --
"top ten elements" would need a different reducer than "widgets
created yesterday".
* you need to be able to record a description of updates on the datatype
to a transaction log in an efficient binary serialization format.
* you need a partition function that, given a description of the nodes
in the cluster and a description of a data update, will give you a
list of data updates to be applied to each shard.
> The system distributes the query to all shards and collects the
> result. Each shard then gets a copy of the global result which it can
> use to decide how to make local updates.
In your thinking, what are "query" and "result" and how are they
represented over the wire? What's a "global result"?
> This model definitely handles typical sharded transaction problems
> like reciprocal friend relations and bank balance transfers. Can you
> give me a real life application need for which this model is
> incompatible?
Without more concrete details about exactly what is going over the wire,
and in what order, I think it's impossible for me to make an assessment.
> The logging model for this is that a total order is put on these pairs
> by the master log. My further claim is that optimistic concurrency
> (speculative execution of queries) allows this model to perform
> reasonably well because most intervening updates are unlikely to
> change any particular query result. And if you are having performance
> problems, there are straightforward ways to rearchitect so you don't.
I still don't have a good handle on your proposed scheme. Let's shelve
discussions about complicated things like speculative updates until we
can agree on what the basics might look like. Some diagrams would really
help. Some questions for which I don't feel I know an answer:
* what are the requirements on a datatype in order for it to be
shardable? (i.e. which type classes must it implement)
* how are transactions handled? Given that data is partitioned into
(presumably) disjoint sets, how do you ensure that a query or update
against the cluster is applied to a consistent version of data?
Given that each node may have a different idea of what the "most
recent transaction" is (if an update's been replicated to node 3 but
not node 5, for instance), how do you maintain the "latest
transaction id" that is consistent across the cluster -- without
forcing all queries or updates through a single machine?
* what would a network diagram look like for a key lookup on "Map Int
Int"? For a range query? For a bulk item update? What's sent over
the wire for each of these (both data & transaction information)?
* what exactly is put to disk, and when? How do you ensure data
consistency across the cluster in the context of shutdown/restart?
Will you be able to support datasets larger than memory, and if so,
how?
* how do the sharding functions change when you add/remove nodes? How
much data do you have to migrate if you add/remove a node, and how
do you migrate it?
> class (Ord k,Eq k) => ObjectId o k where > objectId o = k > > class (ObjectId o k,Component (c o)) => Shardable (c o) where > empty :: c o > divide :: c o -> NumShards -> Index -> c o -- become the Index shard out of numshards after split > merge :: [c o] -> c o > range :: c o -> (k,k) > > class (ObjectId o k,Bounded k,Shardable (c o)) => ShardUpdateCommand u (c o) where > getRanges u -> [(k,k)] > getRanges u = [(minBound,maxBound)] -- default is global broadcast > -- Note: the semantics of this need cleanup but I hope the idea makes senseSo the difference between unsharded update and shardedupdate is that a sharded update can designate the ranged of shards to which it should be delivered, whereas an unsharded update is delivered to all of them. For example, a command to insert an object should be delivered to only to the shards that encompass that objectId.
> type LogEntry updateCommand = (SequenceNumber, shardUpdateCommand)The sequenceNumber guarantees that global state depends only on the log entries and not on the network. So now all queries are with respect to state as of a given SequenceNumber:
> class (Shardable (c o)) => Query qCommand (c o) r v where > qMap::(qCommand, SequenceNumber) -> c o -> r > qReduce::[r] -> vThe mechanics of a global query are:
> class (Shardable (c o),Query qCommand (c o) r v,ShardedUpdateCommand uCommand (c o)) > => Transaction qCommand uCommand where > tQuery::qCommand -> SequenceNumber -> v -- do the query as of a sequence number > tUpdate::v -> uCommand -- convert the query value to an update command > transact::uCommand -> c o -> c o >So now we want to redefine our transaction log entry as:
> type TLogEntry = (Transaction qCommand uCommand) => (SequenceNumber, (qCommand,uCommand))Re-caping the big ideas: