Abstracting the Raft algorithm in Copycat

904 views
Skip to first unread message

Jordan Halterman (kuujo)

unread,
Jan 28, 2015, 3:41:27 AM1/28/15
to raft...@googlegroups.com
TL;DR
https://github.com/kuujo/copycat

Hey all. This is a brief update on Copycat and an in-depth examination of some of the abstractions I've been able to build around the Raft algorithm. I'm hoping that I can get some feedback. Please feel free to point out any issues in my description of Copycat's Raft implementation.

I've mentioned Copycat on this forum a few times. I first became interested in consensus algorithms a little more than a year ago, and I've been working on Copycat ever since. It initially started out as a basic Raft implementation, but as I've identified use cases in my day job it has evolved into a more generally useful distributed coordination framework.

After I built Copycat's original Raft implementation, I realized that many of the components of the algorithm could be encapsulated from one another. A few months ago I set out to build a more widely useful API by abstracting leader election, logging, state management, and other elements of the algorithm from one another. Eventually, Copycat transformed into a multi-module project that includes separate interfaces for leader elections, event logs, state logs, state machines, and distributed collections. Each of these types of resources simply exposes different features of the Raft algorithm in a unique API.

Abstracting portions of the Raft algorithm in order to facilitate a such a modular API required a lot of trial and error. Eventually, I was able to create a core implementation of the Raft algorithm that strictly handles communication and logging, is extensible and reusable, and is generic enough to support a variety of features. High level resources wrap the Raft algorithm and expose use-case specific features such as election information and events, state management, and log compaction.

The Raft implementation
Copycat's core Raft implementation takes the extensible approach of implementing only the basic necessities of the Raft algorithm including only voting, logging, and replication. Other tasks such as membership changes and log compaction are delegated to wrapper classes (resources). Additionally, rather than interacting directly with any transport layer, Copycat delegates work between the Raft algorithm and the transport layer in order to facilitate multiple instances of the algorithm running over the same transport.

At the protocol level, Copycat supports pluggable transports, allowing the framework to be used within a variety of asynchronous environments (actually, this is sort of a legacy feature, as Copycat was originally implemented on the Vert.x event bus).

For the most part, Copycat adheres to the Raft protocol specification whenever necessary, but it does diverge in some areas. Specifically, rather than implementing heartbeats via AppendEntries RPCs, Copycat uses a separate RPC for sending heartbeats when replication is not taking place. This is simply a cleanliness feature and makes no practical difference since these RPCs carry all the same data except entries.

One change to the protocol that is an optimization the use of separate RPC types for reads and writes in Copycat. Read and write requests between nodes were separated because internally Copycat allows optional per-request configuration options for reads. Normally, in order to guarantee consistency Raft dictates that the leader should send a heartbeat to a majority of the cluster in order to ensure that it has not been deposed before processing read-only requests. Of course, there are some common trade-offs to this frequently futile consistency check, so Copycat exposes several consistency modes for reads.
* weak - All reads are immediately evaluated on the local node
* default - When a read request is received, the request will be forwarded to the leader to be evaluated and the leader's heartbeat it used as a sort of lease timeout
* full - All reads are forwarded to the leader to be evaluated and the leader will synchronously heartbeat a majority of the cluster before processing the request

With Copycat's core Raft implementation complete, I decided to explore some options for expanding the conceivable size of the Copycat cluster. After realizing how simple gossip was compared to Raft, I decided to put some effort into implementing a gossip protocol for replication to large clusters. The concept here is gossip is used to replicate only committed entries to "passive" members of the cluster. This makes the gossip protocol significantly more lightweight than Raft since almost no coordination needs to take place between nodes. Once an entry becomes committed on any "active" (Raft voting) member of the cluster, the entry becomes available for replication via the gossip protocol. As with other areas of the framework, the algorithm classes are responsible for communicating entries, while the cluster classes are responsible for managing the dynamic cluster configuration.

Copycat's gossip implementation uses vector clocks to keep track of membership and indexes for passive nodes. Whenever an active or passive member communicates with another passive member, they exchange a vector clock of cluster membership and index information. Versions are used to calculate updates for each entry in the vector clock. This ensures that index changes are more quickly propagated throughout the passive cluster members and thus reduces the risk of unnecessary communication.

Additionally, the cluster - which handles membership changes and failure detection - uses the gossip protocol to determine the status of unreachable nodes. In order to verify that a passive member has left the cluster or died (it makes no distinction between the two), Copycat requires independent verification from multiple nodes by simply piggybacking "suspicious" counters on exchanged gossip messages. Once enough nodes have marked a member as suspicious it is considered dead and that information is gossiped with the updated passive membership list.

Logs
As with many other features of Copycat, the Log is highly configurable and extensible. This helps Copycat serve the wide variety of use cases conquered by its various resource types. Additionally, Copycat's logs are designed to support fast compaction. Whereas Copycat's original log implementations were simple files, logs are now separated into segments. By requiring that log compaction occur on segment rollovers, compacting the Copycat log is as simple as deleting a segment file.

State Machines
Copycat does provide a state machine interface. The state machine wraps the core Raft algorithm to provide Raft log based state management and expose distributed state machine proxies. More relevantly, though, the state machine is an excellent example of the modularity of Copycat's Raft implementation. Because of the variety of use cases supported in Copycat, the core Raft algorithm does not implement log compaction. Various use cases can dictate different methods for compacting the log. Event logs often require time or size based compaction, while state machines require more careful management of logs. The state machine wraps Copycat's pluggable logs in a special log that handles snapshots. When the log grows large enough, of course, Copycat serializes a snapshot of the state machine's state and compacts the log.

Event Logs
Finally, the feature that I was most interested in when I began my effort to abstract the Raft algorithm was event logging. A few months ago when I was researching Kafka for work, I realized that the algorithm Kafka uses for managing partitions and replication via ZooKeeper seemed eerily similar to Raft. Of course, it's not completely similar - as opposed to a quorum based approach, Kafka synchronously replicates to all "in-sync" replicas, followers pull from the leader, and, of course, it doesn't write to disk - but the challenge of abstracting the Raft algorithm to fit the range between state and event logging intrigued me. This led to the highly configurable logging framework within Copycat. Whereas state logs and state machines frequently flush to disk and perform snapshot based compaction, event logs allow for fast off heap in-memory logs that are compacted only via time and size based compaction policies.

Conclusion
You've probably noticed those last few sections were brief. That's because I pretty much tired myself out. There's still a lot to be done on this project, and it's nowhere near production ready, but I have every intention of getting it to that point. I value the insight of people on this forum, and, as I said, I would be really interested in getting feedback, particularly in relation to the Raft and gossip implementations and the general interest in the project. I promise not to make any further responses quite so long :-)

Oren Eini (Ayende Rahien)

unread,
Jan 28, 2015, 5:23:35 AM1/28/15
to Jordan Halterman (kuujo), raft...@googlegroups.com
>  For the most part, Copycat adheres to the Raft protocol specification whenever necessary, but it does diverge in some areas. Specifically, rather than implementing heartbeats via AppendEntries RPCs, Copycat uses a separate RPC for sending heartbeats when replication is not taking place. This is simply a cleanliness feature and makes no practical difference since these RPCs carry all the same data except entries.

Note that this is potentially breaking.
Imagine a scenario where A is the leader, and it is trying to send commands to the other nodes. For some reason, it can't write append entries. It can be that the commands are too large, etc.
If the heartbeats goes out in this case, the nodes think that A is up and simply has nothing to say.


Hibernating Rhinos Ltd  

Oren Eini l CEO Mobile: + 972-52-548-6969

Office: +972-4-622-7811 l Fax: +972-153-4-622-7811

 


--
You received this message because you are subscribed to the Google Groups "raft-dev" group.
To unsubscribe from this group and stop receiving emails from it, send an email to raft-dev+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

JP Sugarbroad

unread,
Jan 28, 2015, 11:56:24 AM1/28/15
to Oren Eini (Ayende Rahien), Jordan Halterman (kuujo), raft...@googlegroups.com
On Wed Jan 28 2015 at 2:23:35 AM Oren Eini (Ayende Rahien) <aye...@ayende.com> wrote:
Note that this is potentially breaking.
Imagine a scenario where A is the leader, and it is trying to send commands to the other nodes. For some reason, it can't write append entries. It can be that the commands are too large, etc.
If the heartbeats goes out in this case, the nodes think that A is up and simply has nothing to say.

I think that's why it's "when replication is not taking place", i.e. when AppendEntries would not contain any entries.

- JP 

Oren Eini (Ayende Rahien)

unread,
Jan 28, 2015, 3:28:49 PM1/28/15
to JP Sugarbroad, Jordan Halterman (kuujo), raft...@googlegroups.com
The issue is whatever this is actually the case?
The details of whatever heartbeat is checked during failure to send are crucial.

Hibernating Rhinos Ltd  

Oren Eini l CEO Mobile: + 972-52-548-6969

Office: +972-4-622-7811 l Fax: +972-153-4-622-7811

 


Diego Ongaro

unread,
Jan 29, 2015, 7:45:19 PM1/29/15
to Jordan Halterman (kuujo), raft...@googlegroups.com
Well, Jordan, I think you might have just become the world's leading
expert on the topic of modular Raft implementations. Congrats.

The gossip stuff sounds pretty cool. How big of a cluster were you
able to run that way?

I'd be interested in hearing more about the "event logs" topic. I've
also wondered about the similarities of log replication in Raft and
primary-backup (Kafka-like) systems (mentioned briefly in the related
work in my dissertation, p167-8). If you had a tiered system with a
Raft cluster at the core and primary-backup clusters handling most of
the requests, how much of the log replication logic would it make
sense for the two types of clusters to share? Seems to me like
AppendEntries may be slightly overkill for primary-backup, though not
by much.

-Diego

On Wed, Jan 28, 2015 at 12:28 PM, Oren Eini (Ayende Rahien)
<aye...@ayende.com> wrote:
> The issue is whatever this is actually the case?
> The details of whatever heartbeat is checked during failure to send are
> crucial.
>
> Hibernating Rhinos Ltd
>
> Oren Eini l CEO l Mobile: + 972-52-548-6969
>
> Office: +972-4-622-7811 l Fax: +972-153-4-622-7811
>
>
>
>
> On Wed, Jan 28, 2015 at 6:56 PM, JP Sugarbroad <tar...@gmail.com> wrote:
>>
>> On Wed Jan 28 2015 at 2:23:35 AM Oren Eini (Ayende Rahien)
>> <aye...@ayende.com> wrote:
>>>
>>> Note that this is potentially breaking.
>>> Imagine a scenario where A is the leader, and it is trying to send
>>> commands to the other nodes. For some reason, it can't write append entries.
>>> It can be that the commands are too large, etc.
>>> If the heartbeats goes out in this case, the nodes think that A is up and
>>> simply has nothing to say.
>>
>>
>> I think that's why it's "when replication is not taking place", i.e. when
>> AppendEntries would not contain any entries.
>>
>> - JP
>
>

Archie Cobbs

unread,
Mar 5, 2015, 6:40:40 PM3/5/15
to raft...@googlegroups.com
On Wednesday, January 28, 2015 at 2:41:27 AM UTC-6, Jordan Halterman (kuujo) wrote:
 
Hi Jordan,

I started looking at Raft just yesterday, and then your copycat Java library. Both are very impressive... though I'm still trying to fully comprehend it all...

And I'm already wondering about how to make this stuff even easier to use :)

Raft provides a consensus sequential log abstraction; on top of that copycat provides a state machine abstraction. It seems like the next step in this evolution is a transaction abstraction.

Now obviously you can bundle up a bunch of state machine events, call them a single "multi-event", and get a "transaction" from that single combined event. But it would be nice if the copycat API provided this kind of functionality explicitly - for the same reason copycat provides state machine, collection, etc. abstractions.

A little background: what I really want is a transactional, distributed key/value database for a clustered Java application. These are the requirements...
  • Fully consistent, ACID transactions
  • Relatively small amount of data (can fit in memory; occasional snapshot network transfers are acceptable)
  • A moderate rate of short duration transactions (i.e., we don't need no fancy MVCC)
  • Understandable, maintainable code
I already have a Java persistence layer that can work on top of a key/value store. Copycat provides the consistency and clustering. So all that's left is the transactional part.

The primary goals are robustness, correctness, and code clarity/maintainability; performance should be good but will not be a showstopper issue here (not trying to reinvent Oracle). In particular, that last list item is why I'm here in the first place. The explicit goal of Raft to make implementation understandable really resonates with my own experience... (stated another way, complexity kills). To that same goal, it would be nice allow the standard "transactional programming thought process" to be applied to the clustered state, instead of having to think in terms of state transitions only.

So I started thinking about how to add a simple transactional ACID database layer to copycat. In my case, the state machine is a key/value store (Map) and state machine events are mutations on the map. I'd like to be able to open a transaction, do some stuff, then commit it all at once.

First, my understanding of copycat's state machine API is: user supplies the state machine (typically it is just some object living in memory); copycat tells you when to query its state, apply change events to its state, do a wholesale snapshot import, or do a wholesale snapshot export. Copycat also needs to know how to (de)serialize change events and snapshots (so it can (de)persist them). Otherwise copycat knows nothing else about the state machine itself. But of course copycat does all the Raft logic to keep that state machine consistent with the rest of the cluster. Copycat doesn't apply changes to the state machine until they are committed by Raft.

Now... to add transactions, we first assume that the underlying state machine (supplied by the user) also has a simple  mark() and reset() capability (like InputStream). Note, in order to safely use this functionality for transactions, copycat would be required to serialize all transactions on an individual node. This doesn't seem like an unreasonable requirement, because with Raft these transactions are going to ultimately be serialized anyway.

When a transaction is opened, you invoke mark() on the state machine and record the current commit index corresponding to its current state. For transaction reads, you just read from the state machine; for transaction writes, you write to the state machine, and also add that write to a mutation list. When it's time to commit the transaction, you first commit the entire mutation list (all-or-none) to the Raft cluster - this would be a single Raft commit; copycat would aggregate the mutation list into a single Raft mutation/commit. Then if the Raft commit succeeds, you leave the state machine as it is and the transaction succeeds; if it fails, you reset() the state machine transaction and the transaction fails.

There is one more detail: we don't have atomicity yet, because another node might make a commit in the middle of the first node's transaction. To fix this, copycat would perform a final consistent read from the leader to ensure that the current commit index has not advanced from the value recorded at the beginning of the transaction. If it has, you reset() and throw a "retry transaction" exception. This final read is the only one that has to travel over the network - in fact, no network I/O occurs at all until the transaction commits.

Small optimizations:
  • If the mutation list is empty at the end of the transaction (i.e., the transaction was read-only), then there's no need to do any Raft commit or network I/O at all; we're immediately done.
  • If we get a heartbeat during the transaction with a higher commit index, immediately mark transaction as failed (throw a retry exception on the next operation)
  • Combine the final consistent read and the actual commit in a single message; the leader would do the commit index check for us and return an error if it failed
This kind of "simplified transactions" layer would not be suitable in a high-contention transaction environment (but then again, perhaps neither is Raft itself). But for use cases like the one described it seems like a useful and fairly trivial addition that would promote the goal that motivated Raft in the first place, i.e., making clustered operations easier to understand and implement.

Thoughts?

Diego Ongaro

unread,
Mar 10, 2015, 4:35:36 PM3/10/15
to Archie Cobbs, raft...@googlegroups.com, Jordan Halterman
Hi Archie,

It sounds to me like you might be describing a form of optimistic
concurrency control (see
https://en.wikipedia.org/wiki/Optimistic_concurrency_control and
minitransactions in Sinfonia
http://www.mshah.org/papers/sosp_2007_aguilera.pdf for some related
reading). I think requiring the commitIndex to not change at all
during a transaction may lead to too many aborts, and it's worth
thinking about other options there.

Also, I still don't see how this require changes at the Raft level or
changes to copycat's API. For example, if mark() and reset() are
indeed the right operations, can they be state machine commands from
the Raft log? Remember, a state machine can take a commit operation,
decide that its conditions haven't been met, and make no state changes
as a result (just as long as this is deterministic).

-Diego

Archie Cobbs

unread,
Mar 10, 2015, 5:34:19 PM3/10/15
to raft...@googlegroups.com, archie...@gmail.com, jordan.h...@gmail.com
Hi Diego,


On Tuesday, March 10, 2015 at 3:35:36 PM UTC-5, Diego Ongaro wrote:
It sounds to me like you might be describing a form of optimistic
concurrency control (see
https://en.wikipedia.org/wiki/Optimistic_concurrency_control and
minitransactions in Sinfonia
http://www.mshah.org/papers/sosp_2007_aguilera.pdf for some related
reading). I think requiring the commitIndex to not change at all
during a transaction may lead to too many aborts, and it's worth
thinking about other options there.

I agree this would only be practical in a low write contention situation.

You'd be betting that most of the time you can (a) start performing some computation based on the current committed state of the FSM, and when ready, (b) commit a new state to the FSM, all before any other node had a chance to advance the FSM past the the state you observed at point (a).

The commit message to the leader would in effect say, "Apply this state change, but only if the FSM has not advanced beyond index X".

So it is like Sinfonia minitransactions, in that you have a "compare and swap" operation.
 
Also, I still don't see how this require changes at the Raft level or
changes to copycat's API. For example, if mark() and reset() are
indeed the right operations, can they be state machine commands from
the Raft log? Remember, a state machine can take a commit operation,
decide that its conditions haven't been met, and make no state changes
as a result (just as long as this is deterministic).
 
I agree it does not require any Raft-level changes. This idea would be more of an API convenience, so developers can think in terms of "transactions" instead of state machine transitions.

As for the copycat API, maybe not strictly required, but to me would make sense not as a change to the existing API but to add it as another "convenience" layer on top of the existing FSM layer... just as the FSM layer is a "convenience" layer on top of the consensus log layer.

So this all boils down to an API design question more than a Raft/consensus theory question. But API design is important when you're talking about trying to make something actually useful in practice.

Ideally, a developer wants to be able to think in these terms:
  1. Open an ACID-compliant transaction on the cluster
  2. Do some arbitrary computation and mutation of the shared cluster state
  3. Commit the transaction (if possible) and go back to sleep knowing all is well
Yes you can recode those steps in terms of FSM transitions but that would be an example of exactly the kind of tedium we are trying to avoid by using computers in the first place :)

Re: moving the mark()/reset() into the committed FSM... Seems like that would work fine, though it's more heavyweight... there's more network traffic, and you pay the cost of a Raft commit even if the "transaction" were rolled back. But it could open the possibility for more contention tolerance.

Doing mark()/reset() on the local node only is more efficient... but of course, your write transactions have to be infrequent enough to make the optimistic locking have a chance of succeeding.

So in summary this idea definitely lives on the "very simple and lightweight, but limited in how much write contention it could handle" end of the "transactional" spectrum. E.g., suitable for some small but critical shared configuration state that was read often and write infrequently (actually that's a pretty common use case). It's main advantage would be extreme simplicity, and it's main disadvantage poor performance under write contention. As you move away from this end of the spectrum, you trade off those two properties.

On the other end of the spectrum, there's no reason you couldn't have an FSM that is itself a whole transactional database. Then you would send it begin(), read(), write(), commit(), rollback(), etc. events over Raft. It could implement whatever fancy form of MVCC it wanted to. Probably would be a little more complicated to implement though :)

-Archie





Terry Tan

unread,
Aug 5, 2016, 12:26:00 AM8/5/16
to raft-dev
Hey ,

I have gone though your project ,and found a question that is you did not use lock or something to keep the order ,
if two client has the same index ,which one will be executed ,which one will be discarded?

在 2015年3月6日星期五 UTC+8上午7:40:40,Archie Cobbs写道:

Archie Cobbs

unread,
Aug 5, 2016, 10:56:25 AM8/5/16
to raft-dev
Hi Terry,


On Thursday, August 4, 2016 at 11:26:00 PM UTC-5, Terry Tan wrote:
I have gone though your project ,and found a question that is you did not use lock or something to keep the order ,
if two client has the same index ,which one will be executed ,which one will be discarded?

The design discussed in this thread is somewhat obsolete.

The way it works now is that each client transaction is based on a certain log entry, its "base index". The client sees the state machine as of that log entry. Mutations are collected locally during the transaction, and then at commit time the base index, any mutations, and ranges of keys read during the transaction are sent to the leader; the latter is used for conflict detection.

The leader checks the current log index. If the Raft log has advanced past the transaction's base index, the leader checks for possible conflicts. A conflict is the mutation in some subsequent log entry of any value that is within any of the read ranges of the transaction. If there are no conflicts, then it's safe to automatically advance the base index to the current log index (i.e., there are no differences in the original base index and the new one, when considering only those keys actually looked at during the transaction). Then the leader adds the mutations to the log, and replies with this new log index. The client then waits for that log index to be committed (in the Raft sense) before considering the transaction committed.

This is a simple MVCC scheme that allows transactions to run in parallel as long as there are no conflicts. If a conflict occurs, the client gets a retry exception.

It should work OK except in situations where there are a lot of parallel transactions reading and writing the same keys. If that happens, you'll get a bunch of retry exceptions. A different scheme (e.g., key range locking) would instead just slightly delay some transactions instead of making them start over. So it's not appropriate for every type of workload.

-Archie

Archie Cobbs

unread,
Aug 5, 2016, 2:55:26 PM8/5/16
to raft-dev
Hi Terry,

I realize now you were (probably?) asking about copycat, not jsimpledb. Sorry about that, if so you can ignore my reply.

-Archie

jordan.h...@gmail.com

unread,
Aug 5, 2016, 5:21:52 PM8/5/16
to raft...@googlegroups.com
This thread is also really out of date in terms of Copycat as well. You should read the extensive documentation of Copycat's Raft implementation on the Atomix website: http://atomix.io/copycat/docs

There is no lock used in the Copycat server because the portions of the server that write to the log are single threaded. The Netty Transport ensures that requests from all clients are serialized on the same thread when writing to the log, so no locks are used anywhere. I've been a major contributor in the past to asynchronous frameworks (Vert.x), and Copycat uses the same model to avoid locking altogether and enforce the order that is necessary for requests. Several single-threaded event loops and queues remove the need for locks. 

No request will ever be discarded. Indexes are not associated with clients, they're associated with specific points in the Raft log and the state machine. If two clients make concurrent write requests, the request that is serialized first will be written to the log first and receive a lower index, and the request that's serialized second will receive a higher index. If two clients make concurrent read requests (which are not written to the log) and no concurrent write occurs, both requests will occur at the same index in the state machine (unless the read is on a follower which could advance its index between the two requests), but that does not break any guarantee.
--

Philip Haynes

unread,
Aug 5, 2016, 11:16:41 PM8/5/16
to raft-dev
Hi Jordan,

Our team initially reviewed Copycat as an option to implement RAFT as part of our project before deciding to develop our own. 
The project seems to have made very good progress since then. 

The design of a platform is often a matter of taste, that stated, I was concerned at the time that CopyCat was 
too feature rich, with too many dependencies to allow us to achieve our correctness and performance goals. 

Looking at our own implementation journey towards production, the amount of effort between "it works" (on my laptop...) to
the QA team considering signing it off to process millions of client transactions per day with sub ms latencies has been surprising
to me (>9 person months with more to come). This reality has forced us to sharpen our pencil on the features we support
and those we don't. For example, our implementation uses multicast and the initial version had the concept of passive follower to 
expand the cluster. A feature I recently took out as it introduced a bunch of failure edge cases resulting not justified by the QA cost.

Where you take CopyCat depends on your goals for the project - as a research platform for distributed consensus or something that
can be trusted within certain parameters to be used in production. If it is the later, based on our experience I would not underestimate the
QA effort and focus on delivering CopyCat's core promises.

HTH.
Kind Regards,
Philip



Heng Chang

unread,
Nov 10, 2016, 8:00:03 AM11/10/16
to raft-dev

Dear Jordan Halterman,


I noticed that you had discussed to implement a gossip protocol for replication to large clusters in Copycat, and I really think this is a great idea.

 

But it seemed that you have implemented in another approach according to the Github website: http://atomix.io/copycat/docs/membership/#passive-members-

 

How about the progress of gossip protocol? 

 

Heng Chang.

Reply all
Reply to author
Forward
0 new messages