Streaming Raft

1,042 views
Skip to first unread message

Ben Johnson

unread,
Apr 29, 2014, 3:50:20 PM4/29/14
to raft...@googlegroups.com
Hi everyone-

I've been wanting to use Raft for replicating large datasets and I had
an idea for a tweak to the Raft protocol which could enable this. I'm
interested in getting feedback as to whether this seems feasible.
(Disclaimer: I feel like I may have heard this idea from a blog post
from Ayende Rahien but I can't seem to find it now)

The Raft protocol replicates data through the AppendEntries RPC. This
works well but only replicates small chunks at a time and needs to
return before sending more data. It's also attached to the heartbeat so
replicating a large entry can cause an election timeout.

I'm curious if the AppendEntries can be broken into two separate calls.
The first call simply streams entries from the leader to the followers
as quickly as possible. It will have the same handshake that currently
exists in Raft to ensure that it's streaming from the last entry that
the follower has. The follower simply receives these entries and writes
them directly to disk.

The second call would be a checkpoint RPC. This reports back to the
leader what the latest replicated entry is on a follower. This serves
the same function as the response from the current AppendEntries RPC. It
can also function as the heartbeat so that followers can check whether
the leader is still alive but the heartbeat is not dependent on the log
entry size (which could cause election timeouts).

It seems like this setup could provide very high throughput while still
maintaining the spirit of Raft.

I'd love to hear feedback. Thanks!


Ben Johnson

Kelly Sommers

unread,
Apr 29, 2014, 5:43:33 PM4/29/14
to raft...@googlegroups.com
I'm going to whip this up pretty quickly since I have a blog post draft that is nearing completion that goes into more details about dispelling the myth that "you need to return before sending more data" that Ayende describes. This is false and there are tons of examples in past research and current implementations. Raft and many other linearizable algorithms can pipeline and send additional requests before previous ones return.

ZAB (ZooKeeper's protocol) does batching and pipelinining and the paper additionally describes A-linearizability. The Raft paper itself mentions the possibility of pipelining however unfortunately doesn't dive into the details. There are also papers that describe pipelining with Paxos.

Pipelining can make a BIG difference. When I was benchmarking Haywire doing HTTP non-pipelined request/response pairs versus HTTP pipelined request/response pairs (which still has to return responses in exact order) it was the difference between 99,000 requests/second and 655,000 requests/second which saturated the Windows Azure network. This is a pretty huge gain.

Your idea of splitting the heartbeats and append may still be something you want to investigate, but I wanted to debunk the above perceived problem from that decision making. If you do it, do it for some of the other reasons.

I think pipelining Raft with the AppendEntries(Entry []entries) batching will give you the performance boost you want to replicate a large dataset.

However, if you need really large values, I'm also going to toss another idea around since brainstorming is fun :) 

I'm not sure what to use for terminology here so I'm going to introduce the concept of a logical key and a physical key. 1 logical key that the application uses to read/write to the database may be represented by multiple physical keys (what gets stored in say a key/value store).

If you want to store a 100GB file, it's going to be hard to find a key/value store that can take that and store it in one key. Since you're already forced into a multi-part kind of storage where a logical key streams from fetching multiple keys from the key/value store, why not do the same with Raft by replicating multi-part data? This way you can tune the heartbeat buffer size to something that makes better sense (keeping in mind AppendEntries is essentially also a batch). If you use pipelining you can fire off these raft messages that construct a multi-part payload without waiting for responses.

I'm not saying your idea is bad. I just wanted to clear some of the false information that raft can't pipeline. :)

Ben Johnson

unread,
Apr 29, 2014, 6:00:25 PM4/29/14
to Kelly Sommers, raft...@googlegroups.com
Thanks for the detailed, thoughtful response, Kelly. I didn't quite see the difference between pipelining and streaming but it makes sense now. Pipelining makes a lot more sense and would be easier to implement. I think I'd get the performance wins I'm looking for too.

I like the idea of separating out the heartbeats. Armon's hashicorp/raft implementation separates it out and it seems to make sense.

It makes sense to handle a multi-gigabyte value in multi-part. I was more concerned with multi-megabyte values periodically exceeding an election timeout.

Thanks again!


Ben

April 29, 2014 3:43 PM
I'm going to whip this up pretty quickly since I have a blog post draft that is nearing completion that goes into more details about dispelling the myth that "you need to return before sending more data" that Ayende describes. This is false and there are tons of examples in past research and current implementations. Raft and many other linearizable algorithms can pipeline and send additional requests before previous ones return.

ZAB (ZooKeeper's protocol) does batching and pipelinining and the paper additionally describes A-linearizability. The Raft paper itself mentions the possibility of pipelining however unfortunately doesn't dive into the details. There are also papers that describe pipelining with Paxos.

Pipelining can make a BIG difference. When I was benchmarking Haywire doing HTTP non-pipelined request/response pairs versus HTTP pipelined request/response pairs (which still has to return responses in exact order) it was the difference between 99,000 requests/second and 655,000 requests/second which saturated the Windows Azure network. This is a pretty huge gain.

Your idea of splitting the heartbeats and append may still be something you want to investigate, but I wanted to debunk the above perceived problem from that decision making. If you do it, do it for some of the other reasons.

I think pipelining Raft with the AppendEntries(Entry []entries) batching will give you the performance boost you want to replicate a large dataset.

However, if you need really large values, I'm also going to toss another idea around since brainstorming is fun :) 

I'm not sure what to use for terminology here so I'm going to introduce the concept of a logical key and a physical key. 1 logical key that the application uses to read/write to the database may be represented by multiple physical keys (what gets stored in say a key/value store).

If you want to store a 100GB file, it's going to be hard to find a key/value store that can take that and store it in one key. Since you're already forced into a multi-part kind of storage where a logical key streams from fetching multiple keys from the key/value store, why not do the same with Raft by replicating multi-part data? This way you can tune the heartbeat buffer size to something that makes better sense (keeping in mind AppendEntries is essentially also a batch). If you use pipelining you can fire off these raft messages that construct a multi-part payload without waiting for responses.

I'm not saying your idea is bad. I just wanted to clear some of the false information that raft can't pipeline. :)

On Tuesday, April 29, 2014 3:50:20 PM UTC-4, benbjohnson wrote:
--
You received this message because you are subscribed to the Google Groups "raft-dev" group.
To unsubscribe from this group and stop receiving emails from it, send an email to raft-dev+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
April 29, 2014 1:50 PM

Armon Dadgar

unread,
Apr 29, 2014, 6:38:16 PM4/29/14
to Kelly Sommers, raft...@googlegroups.com
Thanks for chiming in Kelly! I totally agree with everything said so far.

Ben, if there is anything I can do to help getting this into hashicorp/raft let me know!
I think adding pipelining won’t be too challenging but will be a huge win.

Best Regards,
Armon Dadgar
--

Diego Ongaro

unread,
May 6, 2014, 3:28:07 PM5/6/14
to Armon Dadgar, Kelly Sommers, raft...@googlegroups.com
Sorry about my slow reply; I know some of you have been waiting for
the BGSFL to chime in.

1. Pipelining

Thanks Kelly for setting the record straight that Raft very much
allows pipelining of AppendEntries. As long as followers append to
their logs in log order (which is guaranteed by the AppendEntries
consistency check), this is safe. I've got a local branch of LogCabin
that does pipelining, and one day maybe I'll get to write code again
and finish it. The changes were small, but squeezing all the potential
performance out will take some time.

2. Head-of-line blocking leading to unwanted elections

The concern is that AppendEntries might have to push out such a big
log entry that it might not get done before the election timeout. The
follower will time out, the cluster will lose its leader, and worse
yet, this can repeat forever.

An upper limit on log entry size would help. For example, LogCabin
enforces a 1MB maximum size, and that much should transfer fast enough
on most datacenter networks to avoid timeouts. But let's suppose you
don't want an upper limit.

To address this problem in general, leaders need to be able to get
messages across independently of the size of log entries. Either an
AppendEntries that was forced to contain no entries or a separate
heartbeat message type would work equally well.

3. Separating heartbeats into their own RPC

I think separating heartbeats from AppendEntries into their own RPC is
an aesthetic question. On the one hand, heartbeats are a logically
different function from replicating entries. On the other hand,
heartbeats and AppendEntries share most of the same fields and
processing code.

I think it might just depend on the implementation. In LogCabin, I
think it'd just result in a lot of code duplication to separate
heartbeats out. But in a more dynamically typed language dealing with
JSON, for example, it might be easier to factor out the code common to
both RPCs and might end up being clearer that way. I'll have to look
over hashicorp/raft sometime to see how it works there.

Just my two cents,
Diego

Kelly Sommers

unread,
May 6, 2014, 5:01:52 PM5/6/14
to Diego Ongaro, Armon Dadgar, raft...@googlegroups.com
Diego,

For #2, couldn't multi-part log entries be sent with multiple heartbeats so that you don't suffer from unwanted elections?

Diego Ongaro

unread,
May 6, 2014, 5:40:24 PM5/6/14
to Kelly Sommers, Armon Dadgar, raft...@googlegroups.com
Kelly, yep, you could break large entries down into smaller ones (as
you suggested in your original mail). Unless you need that for some
other reason, though, it might be too much work. If very large values
are common, it might be best to store them elsewhere and just keep a
pointer to them in the Raft log, don't know.

Nicolas Trangez

unread,
May 7, 2014, 5:36:33 PM5/7/14
to Ben Johnson, raft...@googlegroups.com
I've been wanting to reply to this message for some time now but didn't
get to it yet, so here goes. Some of what follows overlaps somewhat with
what Kelly Sommers & Diego mentioned before.

Increasing throughput in a distributed system is all about reducing
'waiting'. The use of RPC by Raft as the protocol is explained in the
paper does introduce such waiting. Luckily the protocol doesn't
introduce enforced dependencies between RPC results and the ability to
proceed making RPC requests, so as Kelly mentioned pipelining is
possible.

This touches a subject I discussed with Diego back when the first drafts
of the paper were released. I think it's a mistake to present Raft as an
RPC-based protocol, or certainly to implement it as such, exactly due to
the 'waiting' it introduces in a naive implementation, and the
requirement of RPC infrastructure at the communication layer (which
brings complicated things like timeouts, reconnection management,... to
to table). Instead I strongly believe using a simple one-way
message-based communication layer which can drop, duplicate or reorder
messages at will makes things much easier to implement, and gets rid of
every possible waiting states. The transformation of Raft from RPC-based
to a message-based protocol is fairly trivial (it's what I implemented
in Kontiki). If you're interested we could go over the details.

Preventing master timeouts is related to this as well: what you're
experiencing when AppendEntries RPC requests become too large to process
timely on a standard TCP-based RPC transport is the well-known 'head of
line blocking' issue of streaming protocols. There are some ways to get
around this:
- Use a separate TCP channel for master-timeout-preventing (empty)
AppendEntries RPC calls and make sure these are handled timely
- Use a message-based system where messages don't block one-another
(UDP) and receive messages whilst still receiving older ones
concurrently
- Use a networking protocol which allows for separate 'streams' within a
connection (like SCTP does), and use concurrent receiving

Nicolas

jpa...@fssrv.net

unread,
Jul 25, 2014, 12:18:02 AM7/25/14
to raft...@googlegroups.com
Can you explain, how it makes sense to separate out heartbeat messages?

Ayende Rahien

unread,
Aug 26, 2014, 8:45:17 AM8/26/14
to raft...@googlegroups.com, benbj...@yahoo.com
Note that heartbeats on their own channel is a bad idea.
Consider this scenario, we have a very large entry, one big enough that it can't go through the network (might be a router issue, if we are using http, it might be a max size issue, maybe your impl has a bug, doesn't really matter).
The case here is that we have an entry that _cannot_ go through the network. If you have a separate heartbeat channel, you get to the point where you are going to be up (heartbeat wise), but down (processing entries wise).
If your entries and heartbeat are the same, this will be treated as a failing nod.e
Reply all
Reply to author
Forward
0 new messages