[PATCH v2 1/9] system_keyspace: persistence for Raft Group 0 id and Raft Server Id

152 views
Skip to first unread message

Konstantin Osipov

<kostja@scylladb.com>
unread,
Jul 24, 2021, 7:13:17 AM7/24/21
to scylladb-dev@googlegroups.com, Konstantin Osipov
Implement system_keyspace helpers to persist Raft Group 0 id
and Raft Server id.
---
db/system_keyspace.hh | 13 ++++++++++++
db/system_keyspace.cc | 48 +++++++++++++++++++++++++++++++++++--------
2 files changed, 52 insertions(+), 9 deletions(-)

diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh
index 247af0774a..1eef66a687 100644
--- a/db/system_keyspace.hh
+++ b/db/system_keyspace.hh
@@ -651,5 +651,18 @@ future<std::optional<cdc::generation_id>> get_cdc_generation_id();
future<bool> cdc_is_rewritten();
future<> cdc_set_rewritten(std::optional<cdc::generation_id_v1>);

+// Load Raft Group 0 id from scylla.local
+future<utils::UUID> get_raft_group0_id();
+
+// Load this server id from scylla.local
+future<utils::UUID> get_raft_server_id();
+
+// Persist Raft Group 0 id. Should be a TIMEUUID.
+future<> set_raft_group0_id(utils::UUID id);
+
+// Called once at fresh server startup to make sure every server
+// has a Raft ID
+future<> set_raft_server_id(utils::UUID id);
+
} // namespace system_keyspace
} // namespace db
diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc
index 329e400604..b03c6e4a2d 100644
--- a/db/system_keyspace.cc
+++ b/db/system_keyspace.cc
@@ -1577,19 +1577,31 @@ template future<> update_peer_info<sstring>(gms::inet_address ep, sstring column
template future<> update_peer_info<utils::UUID>(gms::inet_address ep, sstring column_name, utils::UUID);
template future<> update_peer_info<net::inet_address>(gms::inet_address ep, sstring column_name, net::inet_address);

-future<> set_scylla_local_param(const sstring& key, const sstring& value) {
+template <typename T>
+future<> set_scylla_local_param_as(const sstring& key, const T& value) {
sstring req = format("UPDATE system.{} SET value = ? WHERE key = ?", SCYLLA_LOCAL);
- return qctx->execute_cql(req, value, key).discard_result();
+ auto type = data_type_for<T>();
+ return qctx->execute_cql(req, type->to_string_impl(data_value(value)), key).discard_result();
}

-future<std::optional<sstring>> get_scylla_local_param(const sstring& key){
+
+template <typename T>
+future<std::optional<T>> get_scylla_local_param_as(const sstring& key){
sstring req = format("SELECT value FROM system.{} WHERE key = ?", SCYLLA_LOCAL);
- return qctx->execute_cql(req, key).then([] (::shared_ptr<cql3::untyped_result_set> res) {
- if (res->empty() || !res->one().has("value")) {
- return std::optional<sstring>();
- }
- return std::optional<sstring>(res->one().get_as<sstring>("value"));
- });
+ ::shared_ptr<cql3::untyped_result_set> res = co_await qctx->execute_cql(req, key);
+ if (res->empty() || !res->one().has("value")) {
+ co_return std::optional<T>();
+ }
+ auto type = data_type_for<T>();
+ co_return value_cast<T>(type->deserialize(type->from_string(res->one().get_as<sstring>("value"))));
+}
+
+future<> set_scylla_local_param(const sstring& key, const sstring& value) {
+ return set_scylla_local_param_as<sstring>(key, value);
+}
+
+future<std::optional<sstring>> get_scylla_local_param(const sstring& key){
+ return get_scylla_local_param_as<sstring>(key);
}

future<> update_schema_version(utils::UUID version) {
@@ -2291,6 +2303,24 @@ future<> delete_paxos_decision(const schema& s, const partition_key& key, const
).discard_result();
}

+future<utils::UUID> get_raft_group0_id() {
+ auto opt = co_await get_scylla_local_param_as<utils::UUID>("raft_group0_id");
+ co_return opt.value_or<utils::UUID>({});
+}
+
+future<utils::UUID> get_raft_server_id() {
+ auto opt = co_await get_scylla_local_param_as<utils::UUID>("raft_server_id");
+ co_return opt.value_or<utils::UUID>({});
+}
+
+future<> set_raft_group0_id(utils::UUID uuid) {
+ return set_scylla_local_param_as<utils::UUID>("raft_group0_id", uuid);
+}
+
+future<> set_raft_server_id(utils::UUID uuid) {
+ return set_scylla_local_param_as<utils::UUID>("raft_server_id", uuid);
+}
+
} // namespace system_keyspace

sstring system_keyspace_name() {
--
2.25.1

Konstantin Osipov

<kostja@scylladb.com>
unread,
Jul 24, 2021, 7:13:17 AM7/24/21
to scylladb-dev@googlegroups.com, Konstantin Osipov
**Do not merge. Backward compatibility is broken,
fails to build with the current build toolchain.***

To ensure consistency of schema and topology changes,
Scylla needs a linearizable storage for this data
available at every member of the database cluster.

The goal of this series is to introduce such storage as a service,
available to all Scylla subsystems. Using this service, any other
internal service such as gossip or migrations (schema) could
persist changes to cluster metadata and expect this to be done in
a consistent, linearizable way.

The series uses the built-in Raft library to implement a
dedicated Raft group, running on shard 0, which includes all
members of the cluster (group 0), adds hooks to topology change
events, such as adding or removing nodes of the cluster, to update
group 0 membership, ensures the group is started when the
server boots.

The state machine for the group, i.e. the actual storage
for cluster-wide information still remains a stub. Extending
it to actually persist changes of schema or token ring
is subject to a subsequent series.

Group 0 is a distinct service. Another Raft related service
was implemented earlier: Raft Group Registry. The purpose of the
registry is to allow Scylla have an arbitrary number of groups,
each with its own subset of cluster members and a relevant state
machine, sharing a common transport. Group 0 is one (the first)
group among many.

Outstanding issues:

- Group registry and group 0 boot order is fixed, but shutdown
order is not. We should shutdown group 0 explicitly,
before shutdown of group registry. When should it happen?
- We need to implement a local storage proxy to pass into
raft_schema_state_machine to break the shutdown order
dependency loop.
- hide the work under --experimental-features=raft-schema
- perhaps we should run discovery independently of raft config
changes, at server start, then init group registry,
then start group0
- moving server::cas() out of raft is not done. I keep
disliking the idea, it's hard to see how it works out
before we add timeouts to server::add_entry() and
server::set_configuration(). So perhaps we should add
the timeouts first.

Changes in v2:
* split group0 away from group service
* discovery moved to service/raft

Changes since RFC:
* it actually works
* various RFC input incorporated

Changes still not done:
* refactoring of header files @gleb
* testing cluster restart (including some nodes missing), nodetool
* removenode, replacenode
* testing with dtest

Branch URL: dev/raft-group-0-rebase

Depends on: dev/raft-group-0-system-local

Konstantin Osipov (9):
system_keyspace: persistence for Raft Group 0 id and Raft Server Id
system_keyspace: mark scylla_local table as always-sync commitlog
raft: (discovery) introduce leader discovery state machine
raft: (server) implement taking initial snapshot at start
raft: (server) implement raft::server::cas()
raft: (server) implement id() helper
raft: (service) break a dependency loop
raft: (service) copy captured lambda to coroutine frame
raft: (service) manage Raft configuration during topology changes

configure.py | 7 +
db/system_keyspace.hh | 15 +-
idl/group0.idl.hh | 43 +++
message/messaging_service.hh | 18 +-
raft/raft.hh | 4 +
raft/server.hh | 54 +++-
service/raft/discovery.hh | 128 ++++++++
service/raft/messaging.hh | 63 ++++
service/raft/raft_address_map.hh | 19 ++
service/raft/raft_gossip_failure_detector.hh | 8 +-
service/raft/raft_group0.hh | 88 ++++++
service/raft/raft_group_registry.hh | 105 +++----
service/raft/raft_rpc.hh | 10 +-
service/storage_service.hh | 6 +-
db/system_keyspace.cc | 50 ++-
main.cc | 16 +-
message/messaging_service.cc | 42 +++
raft/server.cc | 29 +-
service/raft/discovery.cc | 146 +++++++++
service/raft/raft_gossip_failure_detector.cc | 7 +-
service/raft/raft_group0.cc | 301 +++++++++++++++++++
service/raft/raft_group_registry.cc | 179 +++++++----
service/raft/raft_rpc.cc | 21 +-
service/raft/raft_sys_table_storage.cc | 5 +-
service/storage_service.cc | 52 +++-
test/boost/gossip_test.cc | 3 +-
test/lib/cql_test_env.cc | 18 +-
test/manual/gossip.cc | 2 +-
test/raft/discovery_test.cc | 111 +++++++
29 files changed, 1361 insertions(+), 189 deletions(-)
create mode 100644 idl/group0.idl.hh
create mode 100644 service/raft/discovery.hh
create mode 100644 service/raft/messaging.hh
create mode 100644 service/raft/raft_group0.hh
create mode 100644 service/raft/discovery.cc
create mode 100644 service/raft/raft_group0.cc
create mode 100644 test/raft/discovery_test.cc

--
2.25.1

Konstantin Osipov

<kostja@scylladb.com>
unread,
Jul 24, 2021, 7:13:18 AM7/24/21
to scylladb-dev@googlegroups.com, Konstantin Osipov
It is infrequently updated (typically once at start) but stores
critical state for this instance survival (Raft Group 0 id, Raft
server id, sstables format), so always write it to commit log
in sync mode.
---
db/system_keyspace.hh | 2 +-
db/system_keyspace.cc | 2 ++
2 files changed, 3 insertions(+), 1 deletion(-)

diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh
index 1eef66a687..5167208119 100644
--- a/db/system_keyspace.hh
+++ b/db/system_keyspace.hh
@@ -144,7 +144,7 @@ static constexpr auto FUNCTIONS = "schema_functions";
static constexpr auto AGGREGATES = "schema_aggregates";
}

-static constexpr const char* extra_durable_tables[] = { PAXOS };
+static constexpr const char* extra_durable_tables[] = { PAXOS, SCYLLA_LOCAL };

bool is_extra_durable(const sstring& name);

diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc
index b03c6e4a2d..6b9f30d4b2 100644
--- a/db/system_keyspace.cc
+++ b/db/system_keyspace.cc
@@ -619,6 +619,8 @@ static schema_ptr large_cells() {
"Scylla specific information about the local node"
));
builder.set_gc_grace_seconds(0);
+ // Raft Group id and server id updates must be sync
+ builder.set_wait_for_sync_to_commitlog(true);
builder.with_version(generate_schema_version(builder.uuid()));
return builder.build(schema_builder::compact_storage::no);
}();
--
2.25.1

Konstantin Osipov

<kostja@scylladb.com>
unread,
Jul 24, 2021, 7:13:19 AM7/24/21
to scylladb-dev@googlegroups.com, Konstantin Osipov
Raft instance should never run without a snapshot and configuration,
so implement a way to take a snapshot for new servers before
starting communication.

This is also necessary to make creation of a Raft instance idempotent
during Scylla cluster discovery. Imagine a fresh cluster setup of two
nodes {A,B}, with a single seed pointing to node A.
Node A boots, creates a Raft group id and initial Raft configuration
with {A} in it, but fails to persist the snapshot. Node B learns group
id from A and persists it locally.
Then A restarts, discovers no persistent state for its Group 0, creates
a new group 0 id and a new group 0 snapshot.
---
raft/server.hh | 6 ++++++
raft/server.cc | 17 +++++++++++++++++
2 files changed, 23 insertions(+)

diff --git a/raft/server.hh b/raft/server.hh
index 80d7fcc7e3..50b80188fa 100644
--- a/raft/server.hh
+++ b/raft/server.hh
@@ -84,6 +84,12 @@ class server {
// Return the currently known configuration
virtual raft::configuration get_configuration() const = 0;

+ // Take the first state machine snapshot and persist it
+ // along with the initial configuration of a new Raft group.
+ // To be called before start for a new group.
+ // Do nothing if the snapshot already exists (idempotency).
+ virtual future<> bootstrap(raft::configuration initial_configuration) = 0;
+
// Load persisted state and start background work that needs
// to run for this Raft server to function; The object cannot
// be used until the returned future is resolved.
diff --git a/raft/server.cc b/raft/server.cc
index 6d045ccfa3..0ba61e5dbd 100644
--- a/raft/server.cc
+++ b/raft/server.cc
@@ -66,6 +66,7 @@ class server_impl : public rpc_server, public server {
future<snapshot_reply> apply_snapshot(server_id from, install_snapshot snp) override;
future<> set_configuration(server_address_set c_new) override;
raft::configuration get_configuration() const override;
+ future<> bootstrap(raft::configuration initial_configuation) override;
future<> start() override;
future<> abort() override;
term_t get_current_term() const override;
@@ -242,6 +243,22 @@ server_impl::server_impl(server_id uuid, std::unique_ptr<rpc> rpc,
}
}

+future<> server_impl::bootstrap(raft::configuration initial_configuation) {
+ auto snapshot = co_await _persistence->load_snapshot();
+
+ if (snapshot.id) {
+ co_return;
+ }
+ // We have no snapshot for this server yet. Store the
+ // first one - the state of the server must be persistent
+ // from the start.
+ snapshot.id = co_await _state_machine->take_snapshot();
+ snapshot.term = term_t{};
+ snapshot.idx = raft::index_t{};
+ snapshot.config = std::move(initial_configuation);
+ co_await _persistence->store_snapshot(snapshot, _config.snapshot_trailing);
+}
+
future<> server_impl::start() {
auto [term, vote] = co_await _persistence->load_term_and_vote();
auto snapshot = co_await _persistence->load_snapshot();
--
2.25.1

Konstantin Osipov

<kostja@scylladb.com>
unread,
Jul 24, 2021, 7:13:19 AM7/24/21
to scylladb-dev@googlegroups.com, Konstantin Osipov
Introduce a special state machine used to to find
a leader of an existing Raft cluster or create
a new cluster.

This state machine should be used when a new
Scylla node has no persisted Raft Group 0 configuration.

The algorithm is initialized with a list of seed
IP addresses, IP address of this server, and,
this server's Raft server id.

The IP addresses are used to construct an initial list of peers.

Then, the algorithm tries to contact each peer (excluding self) from
its peer list and share the peer list with this peer, as well as
get the peer's peer list. If this peer is already part of
some Raft cluster, this information is also shared. On a response
from a peer, the current peer's peer list is updated. The
algorithm stops when all peers have exchanged peer information or
one of the peers responds with id of a Raft group and Raft
server address of the group leader.

(If any of the peers fails to respond, the algorithm re-tries
ad infinitum with a timeout).

More formally, the algorithm stops when one of the following is true:
- it finds an instance with initialized Raft Group 0, with a leader
- all the peers have been contacted, and this server's
Raft server id is the smallest among all contacted peers.
---
configure.py | 5 ++
service/raft/discovery.hh | 128 +++++++++++++++++++++++++++++++
service/raft/discovery.cc | 146 ++++++++++++++++++++++++++++++++++++
test/raft/discovery_test.cc | 111 +++++++++++++++++++++++++++
4 files changed, 390 insertions(+)
create mode 100644 service/raft/discovery.hh
create mode 100644 service/raft/discovery.cc
create mode 100644 test/raft/discovery_test.cc

diff --git a/configure.py b/configure.py
index aa973a3e60..a2b31f8857 100755
--- a/configure.py
+++ b/configure.py
@@ -557,6 +557,7 @@ raft_tests = set([
'test/raft/etcd_test',
'test/raft/raft_sys_table_storage_test',
'test/raft/raft_address_map_test',
+ 'test/raft/discovery_test',
])

apps = set([
@@ -998,6 +999,7 @@ scylla_core = (['database.cc',
'service/raft/raft_rpc.cc',
'service/raft/raft_gossip_failure_detector.cc',
'service/raft/raft_group_registry.cc',
+ 'service/raft/discovery.cc',
] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] \
+ scylla_raft_core
)
@@ -1260,6 +1262,9 @@ deps['test/raft/etcd_test'] = ['test/raft/etcd_test.cc', 'test/lib/log.cc'] + s
deps['test/raft/raft_sys_table_storage_test'] = ['test/raft/raft_sys_table_storage_test.cc'] + \
scylla_raft_dependencies + scylla_tests_generic_dependencies
deps['test/raft/raft_address_map_test'] = ['test/raft/raft_address_map_test.cc'] + scylla_raft_dependencies
+deps['test/raft/discovery_test'] = ['test/raft/discovery_test.cc',
+ 'test/lib/log.cc',
+ 'service/raft/discovery.cc'] + scylla_minimal_raft_dependencies

deps['utils/gz/gen_crc_combine_table'] = ['utils/gz/gen_crc_combine_table.cc']

diff --git a/service/raft/discovery.hh b/service/raft/discovery.hh
new file mode 100644
index 0000000000..785f327a13
--- /dev/null
+++ b/service/raft/discovery.hh
@@ -0,0 +1,128 @@
+/*
+ * Copyright (C) 2021-present ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <http://www.gnu.org/licenses/>.
+ */
+#pragma once
+#include "raft/raft.hh"
+
+namespace service {
+
+// Raft leader discovery FSM
+// https://github.com/kbr-/scylla-raft-boot/blob/master/boot.tla
+//
+// Contact all known peers, extending the transitive closure of
+// the known peers, sharing this server's Raft Id and the list of
+// its peers. Once the transitive closure of peers has been built,
+// select the peer with the smallest Raft Id to be the leader. To
+// be used during initial setup of Raft Group 0.
+class discovery {
+public:
+ // During discovery, peers are identified based on their Internet
+ // address, not Raft server id.
+ struct server_address_hash {
+ size_t operator()(const raft::server_address& address) const {
+ return std::hash<bytes>{}(address.info);
+ }
+ };
+ struct server_address_equal {
+ bool operator()(const raft::server_address& rhs, const raft::server_address&lhs) const {
+ return rhs.info == lhs.info;
+ }
+ };
+
+ // When a fresh cluster is bootstrapping, peer list is
+ // used to build a transitive closure of all cluster members
+ // and select an initial Raft configuration of the cluster.
+ using peer_list = std::vector<raft::server_address>;
+ using peer_set = std::unordered_set<raft::server_address, server_address_hash, server_address_equal>;
+ struct i_am_leader {};
+ struct pause {};
+ using request_list = std::vector<std::pair<raft::server_address, peer_list>>;
+ // @sa discovery::get_output()
+ using output = std::variant<i_am_leader, pause, request_list>;
+private:
+ raft::server_address _self;
+ // Assigned if this server elects itself a leader.
+ bool _is_leader = false;
+ // _seeds + all peers we've discovered, excludes _self
+ peer_set _peers;
+ // A subset of _peers which have responded to our requests, excludes _self.
+ peer_set _responded;
+ // _peers + self - the peer list we're sharing; if this node
+ // is a leader, empty list to save bandwidth
+ peer_list _peer_list;
+ // outstanding messages
+ request_list _requests;
+private:
+ // Update this state machine with new peer data and
+ // create outbound messages if necessary.
+ void step(const peer_list& peers);
+ // Check if we can run election and then elect itself
+ // a leader.
+ void maybe_become_leader();
+public:
+ // For construction, pass this server's Internet address and
+ // Raft id - and a set of seed Internet addresses. It's OK to
+ // leave Raft ids of seed peers unset, they will be updated as
+ // these peers respond.
+ //
+ // For discovery to work correctly the following must hold:
+ //
+ // - this server's Raft id must survive restarts.
+ // The opposite would be a Byzantine failure: imagine
+ // we generate and share a big id first, so another node
+ // elects itself a leader. Then this node restarts, generates
+ // the smallest known id and elects itself a leader too.
+ //
+ // - the seed graph must contain a vertex which is reachable from
+ // every other vertex, for example it can be be fully
+ // connected, with either each server having at least one
+ // common seed or seed connections forming a loop. A rule of
+ // thumb is to use the same seed list everywhere.
+ //
+ discovery(raft::server_address self, const peer_list& seeds);
+
+ // To be used on the receiving peer to generate a reply
+ // while the discovery protocol is in progress. Always
+ // returns a peer list, even if this node is a leader,
+ // since leader state must be persisted first.
+ peer_list request(const peer_list& peers);
+
+ // Submit a reply from one of the peers to this discovery
+ // state machine. If this node is a leader, resposne is
+ // ignored.
+ void response(raft::server_address from, const peer_list& peers);
+
+ // Until all peers answer, returns a list of messages for the
+ // peers which haven't replied yet. As soon as all peers have
+ // replied, returns a pause{}, to allow some node to become
+ // a leader, and then a list of messages for all peers which
+ // can be used to find the leader. If this node is a leader,
+ // returns leader{}.
+ discovery::output get_output();
+
+ // A helper for testing.
+ bool is_leader() { return _is_leader; }
+
+ // A helper used for testing
+ raft::server_id id() const { return _self.id; }
+};
+
+} // namespace raft
+
diff --git a/service/raft/discovery.cc b/service/raft/discovery.cc
new file mode 100644
index 0000000000..d97cfb6dbd
--- /dev/null
+++ b/service/raft/discovery.cc
@@ -0,0 +1,146 @@
+/*
+ * Copyright (C) 2021-present ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include "service/raft/discovery.hh"
+
+namespace service {
+
+void check_peer(const raft::server_address& peer) {
+ if (!peer.info.size()) {
+ throw std::logic_error("Discovery requires peer internet address to be set");
+ }
+}
+
+discovery::discovery(raft::server_address self, const peer_list& seeds)
+ : _self(std::move(self)) {
+
+ // self must have a non-empty Internet address
+ check_peer(_self);
+ for (const auto& addr : seeds) {
+ check_peer(addr);
+ }
+ _peer_list.push_back(_self);
+
+ step(seeds);
+}
+
+void discovery::step(const peer_list& peers) {
+
+ if (_is_leader) {
+ return;
+ }
+
+ peer_set new_peers;
+ // Set to true if we learned about a new peer or
+ // received Raft server ID for one of the seeds.
+ bool refresh_peer_list = false;
+
+ for (const auto& addr : peers) {
+ // peer must have a non-empty Internet address
+ if (addr.info == _self.info) {
+ // do not include _self into _peers
+ continue;
+ }
+ auto it = _peers.find(addr);
+ // Update peer information if it's a new peer or provides
+ // a Raft ID for an existing peer.
+ if (it == _peers.end() || it->id == raft::server_id{}) {
+ refresh_peer_list = true;
+ if (it == _peers.end()) {
+ _peers.emplace(addr);
+ new_peers.emplace(addr);
+ } else {
+ // Update Raft ID
+ _peers.erase(it);
+ _peers.emplace(addr);
+ }
+ } else {
+ // If we have this peer, its ID must be the
+ // same as we know (with the exceptions of seeds,
+ // for which servers might not know ids at first).
+ assert(it == _peers.end() || it->id == addr.id || addr.id == raft::server_id{});
+ }
+ }
+ if (refresh_peer_list) {
+ _peer_list = {_peers.begin(), _peers.end()};
+ _peer_list.push_back(_self);
+ }
+ maybe_become_leader();
+ if (_is_leader) {
+ return;
+ }
+ for (const auto& peer : new_peers) {
+ _requests.push_back(std::make_pair(peer, _peer_list));
+ }
+}
+
+void discovery::maybe_become_leader() {
+ /*
+ * _responded is a subset of _peers.
+ * When all contacted peers have responded, we're ready
+ * to choose a node with the smallest id for the leader.
+ */
+ if (_responded.size() < _peers.size()) {
+ return;
+ }
+ auto min_id = std::min_element(_peer_list.begin(), _peer_list.end());
+ if (min_id != _peer_list.end() && min_id->id == _self.id) {
+ _is_leader = true;
+ }
+}
+
+discovery::peer_list discovery::request(const peer_list& peers) {
+ step(peers);
+ return _peer_list;
+}
+
+void discovery::response(raft::server_address from, const peer_list& peers) {
+ assert(_peers.contains(from));
+ _responded.emplace(from);
+ step(peers);
+}
+
+discovery::output discovery::get_output() {
+ if (_is_leader) {
+ return i_am_leader{};
+ } else if (!_requests.empty()) {
+ return std::move(_requests);
+ } else {
+ if (_responded.size() == _peers.size()) {
+ // All have responded, but we're not a leader.
+ // Try to find out who it is. Don't waste traffic on
+ // the peer list.
+ for (const auto& peer : _peers) {
+ _requests.push_back(std::make_pair(peer, peer_list{}));
+ }
+ } else {
+ // Contact new peers
+ for (const auto& peer : _peers) {
+ if (_responded.contains(peer)) {
+ continue;
+ }
+ _requests.push_back(std::make_pair(peer, _peer_list));
+ }
+ }
+ return pause{};
+ }
+}
+
+} // end of namespace raft
diff --git a/test/raft/discovery_test.cc b/test/raft/discovery_test.cc
new file mode 100644
index 0000000000..940c37ed07
--- /dev/null
+++ b/test/raft/discovery_test.cc
@@ -0,0 +1,111 @@
+/*
+ * Copyright (C) 2021-present ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include "test/raft/helpers.hh"
+#include "service/raft/discovery.hh"
+
+using namespace raft;
+
+using discovery_network = std::unordered_map<server_id, service::discovery*>;
+
+using service::discovery;
+
+void
+run_discovery_impl(discovery_network& network) {
+ while (true) {
+ for (auto e : network) {
+ discovery& from = *e.second;
+ auto output = from.get_output();
+ if (std::holds_alternative<discovery::i_am_leader>(output)) {
+ return;
+ } else if (std::holds_alternative<discovery::pause>(output)) {
+ continue;
+ }
+ auto& msgs = std::get<discovery::request_list>(output);
+ for (auto&& m : msgs) {
+ auto it = network.find(m.first.id);
+ if (it == network.end()) {
+ // The node is not available, drop the message
+ continue;
+ }
+ discovery& to = *(it->second);
+ from.response(m.first, to.request(m.second));
+ }
+ }
+ }
+}
+
+template <typename... Args>
+void run_discovery(Args&&... args) {
+ discovery_network network;
+ auto add_node = [&network](discovery& node) -> void {
+ network.emplace(node.id(), &node);
+ };
+ (add_node(args), ...);
+ run_discovery_impl(network);
+}
+
+BOOST_AUTO_TEST_CASE(test_basic) {
+
+ server_address addr1 = {.id = id()};
+
+ // Must supply an Internet address for self
+ BOOST_CHECK_THROW(discovery(addr1, {}), std::logic_error);
+ server_address addr2 = {.id = id(), .info = "192.168.1.2"};
+ BOOST_CHECK_NO_THROW(discovery(addr2, {}));
+ // Must supply an Internet address for each peer
+ BOOST_CHECK_THROW(discovery(addr2, {addr1}), std::logic_error);
+ // OK to include self into peers
+ BOOST_CHECK_NO_THROW(discovery(addr2, {addr2}));
+ // With a single peer, discovery immediately finds a leader
+ discovery d(addr2, {});
+ BOOST_CHECK(d.is_leader());
+ d = discovery(addr2, {addr2});
+ BOOST_CHECK(d.is_leader());
+}
+
+
+BOOST_AUTO_TEST_CASE(test_discovery) {
+
+ server_address addr1 = {.id = id(), .info = "192.168.1.1"};
+ server_address addr2 = {.id = id(), .info = "192.168.1.2"};
+
+ discovery d1(addr1, {addr2});
+ discovery d2(addr2, {addr1});
+ run_discovery(d1, d2);
+
+ BOOST_CHECK(d1.is_leader() ^ d2.is_leader());
+}
+
+BOOST_AUTO_TEST_CASE(test_discovery_fullmesh) {
+
+ server_address addr1 = {.id = id(), .info = "127.0.0.13"};
+ server_address addr2 = {.id = id(), .info = "127.0.0.19"};
+ server_address addr3 = {.id = id(), .info = "127.0.0.21"};
+
+ auto seeds = std::vector<server_address>({addr1, addr2, addr3});
+
+ discovery d1(addr1, seeds);
+ discovery d2(addr2, seeds);
+ discovery d3(addr3, seeds);
+ run_discovery(d1, d2, d3);
+
+ BOOST_CHECK(d1.is_leader() ^ d2.is_leader() ^ d3.is_leader());
+}
--
2.25.1

Konstantin Osipov

<kostja@scylladb.com>
unread,
Jul 24, 2021, 7:13:20 AM7/24/21
to scylladb-dev@googlegroups.com, Konstantin Osipov
There is no easy way to get server id otherwise.
---
raft/server.hh | 2 ++
raft/server.cc | 5 +++++
2 files changed, 7 insertions(+)

diff --git a/raft/server.hh b/raft/server.hh
index d5399e7751..c28831e27a 100644
--- a/raft/server.hh
+++ b/raft/server.hh
@@ -172,6 +172,8 @@ class server {
virtual void elapse_election() = 0;
virtual bool is_leader() = 0;
virtual void tick() = 0;
+ // Server id of this server
+ virtual raft::server_id id() const = 0;
private:
// Serialize concurrent CAS operations in a single Raft group
// XXX: the semaphore should be parameterized with clock type?
diff --git a/raft/server.cc b/raft/server.cc
index e63aed1567..47724eff27 100644
--- a/raft/server.cc
+++ b/raft/server.cc
@@ -78,6 +78,7 @@ class server_impl : public rpc_server, public server {
void elapse_election() override;
bool is_leader() override;
void tick() override;
+ raft::server_id id() const override;
future<> stepdown(logical_clock::duration timeout) override;
private:
std::unique_ptr<rpc> _rpc;
@@ -865,6 +866,10 @@ void server_impl::tick() {
_fsm->tick();
}

+raft::server_id server_impl::id() const {
+ return _id;
+}
+
const server_address_set& server_impl::get_rpc_config() const {
return _current_rpc_config;
}
--
2.25.1

Konstantin Osipov

<kostja@scylladb.com>
unread,
Jul 24, 2021, 7:13:20 AM7/24/21
to scylladb-dev@googlegroups.com, Konstantin Osipov
Implement an atomic compare-and-swap operation to
atomically set configuration or apply an entry after
reading the current state of the state machine or Raft
server.

Mention the yield requirements to fsm::add_entry()
implied by server::cas()
---
raft/raft.hh | 4 ++++
raft/server.hh | 46 ++++++++++++++++++++++++++++++++++++----------
raft/server.cc | 7 ++++++-
3 files changed, 46 insertions(+), 11 deletions(-)

diff --git a/raft/raft.hh b/raft/raft.hh
index 9cc20b1cfe..b9fcaba723 100644
--- a/raft/raft.hh
+++ b/raft/raft.hh
@@ -232,6 +232,10 @@ struct no_other_voting_member : public error {
no_other_voting_member() : error("Cannot stepdown because there is no other voting member") {}
};

+struct cas_aborted: public error {
+ cas_aborted(term_t term) : error(format("Cannot complete Raft CAS due to a term change to {}", term)) {}
+};
+
// True if a failure to execute a Raft operation can be re-tried,
// perhaps with a different server.
inline bool is_transient_error(const std::exception& e) {
diff --git a/raft/server.hh b/raft/server.hh
index 50b80188fa..d5399e7751 100644
--- a/raft/server.hh
+++ b/raft/server.hh
@@ -20,6 +20,7 @@
*/
#pragma once
#include "raft.hh"
+#include <seastar/core/semaphore.hh>

namespace raft {

@@ -124,17 +125,9 @@ class server {
// machine and add_entry() will be linearised as well.
//
// To sum up, @read_barrier() can be used as a poor man
- // distributed Compare-And-Swap:
+ // distributed Compare-And-Swap. You can find example use in
+ // raft::server::cas()
//
- // lock()
- // term_t term = get_current_term()
- // co_await read_barrier()
- // ... Read previous value from the state machine ...
- // ... Create a new value ...
- // if (term == get_current_term())) {
- // co_await add_entry();
- // }
- // unlock()
virtual future<> read_barrier() = 0;

// Initiate leader stepdown process.
@@ -142,6 +135,35 @@ class server {
// In case of a timeout returns timeout_error.
virtual future<> stepdown(logical_clock::duration timeout) = 0;

+ // Atomically read the state of the state machine and apply
+ // the function output to it. Please note that since the lock
+ // protection here is advisory, only the state which is
+ // always modified through cas() can be accessed in a linearizable
+ // manner, if the state is modified both via cas() and
+ // a simple add_entry() linearizability is not guaranteed.
+ template<typename Func>
+ requires (std::is_invocable_r_v<future<raft::command>, Func, raft::server&> ||
+ std::is_invocable_r_v<future<raft::server_address_set>, Func, const raft::server&>)
+ future<> cas(Func func) {
+ co_await with_semaphore(_cas_sem, 1, [this, func1 = std::move(func)] () -> future<> {
+ // Workaround https://gcc.gnu.org/bugzilla/show_bug.cgi?id=95111
+ auto func = std::move(func1);
+ term_t term = get_current_term();
+ co_await read_barrier();
+ auto r = co_await func(*this);
+ if (term == get_current_term()) {
+ using T = std::decay_t<decltype(r)>;
+ if constexpr (std::is_same_v<T, raft::command>) {
+ co_await add_entry(r, wait_type::applied);
+ } else if constexpr (std::is_same_v<T, raft::server_address_set>) {
+ co_await set_configuration(r);
+ }
+ } else {
+ throw cas_aborted(get_current_term());
+ }
+ });
+ }
+
// Ad hoc functions for testing
virtual void wait_until_candidate() = 0;
virtual future<> wait_election_done() = 0;
@@ -150,6 +172,10 @@ class server {
virtual void elapse_election() = 0;
virtual bool is_leader() = 0;
virtual void tick() = 0;
+private:
+ // Serialize concurrent CAS operations in a single Raft group
+ // XXX: the semaphore should be parameterized with clock type?
+ semaphore _cas_sem{1};
};

std::unique_ptr<server> create_server(server_id uuid, std::unique_ptr<rpc> rpc,
diff --git a/raft/server.cc b/raft/server.cc
index 0ba61e5dbd..e63aed1567 100644
--- a/raft/server.cc
+++ b/raft/server.cc
@@ -306,11 +306,16 @@ template <typename T>
future<> server_impl::add_entry_internal(T command, wait_type type) {
logger.trace("An entry is submitted on a leader");

- // Wait for a new slot to become available
+ // Wait for a new slot to become available. Note that this
+ // works fine with server::cas() since if we lose leadership
+ // while waiting the semaphore is set to broken state.
+ // Concurrent cas() is not possible due to a lock around the
+ // cas() itself.
co_await _fsm->wait_max_log_size();

logger.trace("An entry proceeds after wait");

+ // Sic: sever::cas() assumes this doesn't yield.
const log_entry& e = _fsm->add_entry(std::move(command));

auto& container = type == wait_type::committed ? _awaited_commits : _awaited_applies;
--
2.25.1

Konstantin Osipov

<kostja@scylladb.com>
unread,
Jul 24, 2021, 7:13:21 AM7/24/21
to scylladb-dev@googlegroups.com, Konstantin Osipov
Break a dependency loop raft_rpc <-> raft_group_registry
via raft_address_map. Pass raft_address_map to raft_rpc and
raft_gossip_failure_detector explicitly, not entire raft_group_registry.

Extract server_for_group into a helper class. It's going to be used by
raft_group0 so make it easier to reference.
---
service/raft/raft_address_map.hh | 19 ++++++++++
service/raft/raft_gossip_failure_detector.hh | 8 ++---
service/raft/raft_group_registry.hh | 36 +++++++------------
service/raft/raft_rpc.hh | 10 +++---
service/raft/raft_gossip_failure_detector.cc | 7 ++--
service/raft/raft_group_registry.cc | 37 ++++++--------------
service/raft/raft_rpc.cc | 21 ++++++-----
7 files changed, 64 insertions(+), 74 deletions(-)

diff --git a/service/raft/raft_address_map.hh b/service/raft/raft_address_map.hh
index 20f95ddcda..3872f7fcd8 100644
--- a/service/raft/raft_address_map.hh
+++ b/service/raft/raft_address_map.hh
@@ -21,6 +21,7 @@
#pragma once

#include "gms/inet_address.hh"
+#include "gms/inet_address_serializer.hh"
#include "raft/raft.hh"

#include <seastar/core/lowres_clock.hh>
@@ -38,6 +39,10 @@ namespace service {

extern seastar::logger rslog;

+using raft_ticker_type = seastar::timer<lowres_clock>;
+// TODO: should be configurable.
+static constexpr raft_ticker_type::duration raft_tick_interval = std::chrono::milliseconds(100);
+
// This class provides an abstraction of expirable server address mappings
// used by the raft rpc module to store connection info for servers in a raft group.
template <typename Clock = seastar::lowres_clock>
@@ -334,6 +339,20 @@ class raft_address_map {
// Erase both from LRU list and base storage
unlink_and_dispose(set_it);
}
+
+ // Map raft server_id to inet_address to be consumed by `messaging_service`
+ gms::inet_address get_inet_address(raft::server_id id) const {
+ auto it = find(id);
+ if (!it) {
+ on_internal_error(rslog, format("Destination raft server not found with id {}", id));
+ }
+ return *it;
+ }
+ raft::server_address get_server_address(raft::server_id id) const {
+ return raft::server_address{.id = id,
+ .info = ser::serialize_to_buffer<bytes>(get_inet_address(id))
+ };
+ }
};

} // end of namespace service
diff --git a/service/raft/raft_gossip_failure_detector.hh b/service/raft/raft_gossip_failure_detector.hh
index 748d7594a2..978e5fe690 100644
--- a/service/raft/raft_gossip_failure_detector.hh
+++ b/service/raft/raft_gossip_failure_detector.hh
@@ -21,7 +21,7 @@
*/
#pragma once

-#include "raft/raft.hh"
+#include "raft_address_map.hh"

namespace gms {
class gossiper;
@@ -29,18 +29,16 @@ class gossiper;

namespace service {

-class raft_group_registry;
-
// Scylla-specific implementation of raft failure detector module.
//
// Currently uses gossiper as underlying implementation to test for `is_alive(gms::inet_address)`.
// Gets the mapping from server id to gms::inet_address from RPC module.
class raft_gossip_failure_detector : public raft::failure_detector {
gms::gossiper& _gossip;
- raft_group_registry& _raft_gr;
+ raft_address_map<>& _address_map;

public:
- raft_gossip_failure_detector(gms::gossiper& gs, raft_group_registry& raft_gr);
+ raft_gossip_failure_detector(gms::gossiper& gs, raft_address_map<>& address_map);

bool is_alive(raft::server_id server) override;
};
diff --git a/service/raft/raft_group_registry.hh b/service/raft/raft_group_registry.hh
index a785b20027..018f67e4f2 100644
--- a/service/raft/raft_group_registry.hh
+++ b/service/raft/raft_group_registry.hh
@@ -53,16 +53,20 @@ struct raft_group_not_found: public raft::error {
{}
};

+// An entry in the group registry
+struct raft_server_for_group {
+ raft::group_id gid;
+ std::unique_ptr<raft::server> server;
+ std::unique_ptr<raft_ticker_type> ticker;
+ raft_rpc& rpc;
+};
+
// This class is responsible for creating, storing and accessing raft servers.
// It also manages the raft rpc verbs initialization.
//
// `peering_sharded_service` inheritance is used to forward requests
// to the owning shard for a given raft group_id.
class raft_group_registry : public seastar::peering_sharded_service<raft_group_registry> {
-public:
- using ticker_type = seastar::timer<lowres_clock>;
- // TODO: should be configurable.
- static constexpr ticker_type::duration tick_interval = std::chrono::milliseconds(100);
private:
netw::messaging_service& _ms;
gms::gossiper& _gossiper;
@@ -70,15 +74,9 @@ class raft_group_registry : public seastar::peering_sharded_service<raft_group_r
// Shard-local failure detector instance shared among all raft groups
shared_ptr<raft_gossip_failure_detector> _fd;

- struct server_for_group {
- raft::group_id gid;
- std::unique_ptr<raft::server> server;
- std::unique_ptr<ticker_type> ticker;
- raft_rpc& rpc;
- };
// Raft servers along with the corresponding timers to tick each instance.
// Currently ticking every 100ms.
- std::unordered_map<raft::group_id, server_for_group> _servers;
+ std::unordered_map<raft::group_id, raft_server_for_group> _servers;
// inet_address:es for remote raft servers known to us
raft_address_map<> _srv_address_mappings;

@@ -86,9 +84,9 @@ class raft_group_registry : public seastar::peering_sharded_service<raft_group_r
seastar::future<> uninit_rpc_verbs();
seastar::future<> stop_servers();

- server_for_group create_server_for_group(raft::group_id id);
+ raft_server_for_group create_server_for_group(raft::group_id id);

- server_for_group& get_server_for_group(raft::group_id id);
+ raft_server_for_group& server_for_group(raft::group_id id);
public:

raft_group_registry(netw::messaging_service& ms, gms::gossiper& gs, sharded<cql3::query_processor>& qp);
@@ -114,17 +112,9 @@ class raft_group_registry : public seastar::peering_sharded_service<raft_group_r

// Start raft server instance, store in the map of raft servers and
// arm the associated timer to tick the server.
- future<> start_server_for_group(server_for_group grp);
+ future<> start_server_for_group(raft_server_for_group grp);
unsigned shard_for_group(const raft::group_id& gid) const;
-
- // Map raft server_id to inet_address to be consumed by `messaging_service`
- gms::inet_address get_inet_address(raft::server_id id) const;
- // Update inet_address mapping for a raft server with a given id.
- // In case a mapping exists for a given id, it should be equal to the supplied `addr`
- // otherwise the function will throw.
- void update_address_mapping(raft::server_id id, gms::inet_address addr, bool expiring);
- // Remove inet_address mapping for a raft server
- void remove_address_mapping(raft::server_id);
+ raft_address_map<>& address_map() { return _srv_address_mappings; }
};

} // end of namespace service
diff --git a/service/raft/raft_rpc.hh b/service/raft/raft_rpc.hh
index ec03112e09..46e250e06a 100644
--- a/service/raft/raft_rpc.hh
+++ b/service/raft/raft_rpc.hh
@@ -24,7 +24,7 @@
#include "raft/raft.hh"
#include "message/messaging_service_fwd.hh"
#include "utils/UUID.hh"
-#include "service/raft/raft_group_registry.hh"
+#include "service/raft/raft_address_map.hh"

namespace service {

@@ -36,15 +36,15 @@ class raft_rpc : public raft::rpc {
raft::group_id _group_id;
raft::server_id _server_id;
netw::messaging_service& _messaging;
- raft_group_registry& _raft_gr;
+ raft_address_map<>& _address_map;
seastar::gate _shutdown_gate;

- raft_group_registry::ticker_type::time_point timeout() {
- return raft_group_registry::ticker_type::clock::now() + raft_group_registry::tick_interval * (raft::ELECTION_TIMEOUT.count() / 2);
+ raft_ticker_type::time_point timeout() {
+ return raft_ticker_type::clock::now() + raft_tick_interval * (raft::ELECTION_TIMEOUT.count() / 2);
}

public:
- explicit raft_rpc(netw::messaging_service& ms, raft_group_registry& raft_gr, raft::group_id gid, raft::server_id srv_id);
+ explicit raft_rpc(netw::messaging_service& ms, raft_address_map<>& address_map, raft::group_id gid, raft::server_id srv_id);

future<raft::snapshot_reply> send_snapshot(raft::server_id server_id, const raft::install_snapshot& snap, seastar::abort_source& as) override;
future<> send_append_entries(raft::server_id id, const raft::append_request& append_request) override;
diff --git a/service/raft/raft_gossip_failure_detector.cc b/service/raft/raft_gossip_failure_detector.cc
index 083550c598..7913931618 100644
--- a/service/raft/raft_gossip_failure_detector.cc
+++ b/service/raft/raft_gossip_failure_detector.cc
@@ -20,17 +20,16 @@
*/

#include "service/raft/raft_gossip_failure_detector.hh"
-#include "service/raft/raft_group_registry.hh"
#include "gms/gossiper.hh"

namespace service {

-raft_gossip_failure_detector::raft_gossip_failure_detector(gms::gossiper& gs, raft_group_registry& raft_gr)
- : _gossip(gs), _raft_gr(raft_gr)
+raft_gossip_failure_detector::raft_gossip_failure_detector(gms::gossiper& gs, raft_address_map<>& address_map)
+ : _gossip(gs), _address_map(address_map)
{}

bool raft_gossip_failure_detector::is_alive(raft::server_id server) {
- return _gossip.is_alive(_raft_gr.get_inet_address(server));
+ return _gossip.is_alive(_address_map.get_inet_address(server));
}

} // end of namespace service
diff --git a/service/raft/raft_group_registry.cc b/service/raft/raft_group_registry.cc
index e363b38e89..685e23941c 100644
--- a/service/raft/raft_group_registry.cc
+++ b/service/raft/raft_group_registry.cc
@@ -39,7 +39,7 @@ namespace service {
logging::logger rslog("raft_group_registry");

raft_group_registry::raft_group_registry(netw::messaging_service& ms, gms::gossiper& gs, sharded<cql3::query_processor>& qp)
- : _ms(ms), _gossiper(gs), _qp(qp), _fd(make_shared<raft_gossip_failure_detector>(gs, *this))
+ : _ms(ms), _gossiper(gs), _qp(qp), _fd(make_shared<raft_gossip_failure_detector>(gs, _srv_address_mappings))
{
(void) _gossiper;
}
@@ -55,7 +55,7 @@ void raft_group_registry::init_rpc_verbs() {
auto& rpc = self.get_rpc(gid);
// The address learnt from a probably unknown server should
// eventually expire
- self.update_address_mapping(from, std::move(addr), true);
+ self._srv_address_mappings.set(from, std::move(addr), true);
// Execute the actual message handling code
return handler(rpc);
});
@@ -142,7 +142,7 @@ seastar::future<> raft_group_registry::uninit() {
});
}

-raft_group_registry::server_for_group& raft_group_registry::get_server_for_group(raft::group_id gid) {
+raft_server_for_group& raft_group_registry::server_for_group(raft::group_id gid) {
auto it = _servers.find(gid);
if (it == _servers.end()) {
throw raft_group_not_found(gid);
@@ -151,17 +151,17 @@ raft_group_registry::server_for_group& raft_group_registry::get_server_for_group
}

raft_rpc& raft_group_registry::get_rpc(raft::group_id gid) {
- return get_server_for_group(gid).rpc;
+ return server_for_group(gid).rpc;
}

raft::server& raft_group_registry::get_server(raft::group_id gid) {
- return *(get_server_for_group(gid).server);
+ return *(server_for_group(gid).server);
}

-raft_group_registry::server_for_group raft_group_registry::create_server_for_group(raft::group_id gid) {
+raft_server_for_group raft_group_registry::create_server_for_group(raft::group_id gid) {

raft::server_id my_id = raft::server_id::create_random_id();
- auto rpc = std::make_unique<raft_rpc>(_ms, *this, gid, my_id);
+ auto rpc = std::make_unique<raft_rpc>(_ms, _srv_address_mappings, gid, my_id);
// Keep a reference to a specific RPC class.
auto& rpc_ref = *rpc;
auto storage = std::make_unique<raft_sys_table_storage>(_qp.local(), gid);
@@ -170,9 +170,9 @@ raft_group_registry::server_for_group raft_group_registry::create_server_for_gro
std::move(storage), _fd, raft::server::configuration());

// initialize the corresponding timer to tick the raft server instance
- auto ticker = std::make_unique<ticker_type>([srv = server.get()] { srv->tick(); });
+ auto ticker = std::make_unique<raft_ticker_type>([srv = server.get()] { srv->tick(); });

- return server_for_group{
+ return raft_server_for_group{
.gid = std::move(gid),
.server = std::move(server),
.ticker = std::move(ticker),
@@ -180,7 +180,7 @@ raft_group_registry::server_for_group raft_group_registry::create_server_for_gro
};
}

-future<> raft_group_registry::start_server_for_group(server_for_group new_grp) {
+future<> raft_group_registry::start_server_for_group(raft_server_for_group new_grp) {
auto gid = new_grp.gid;
auto [it, inserted] = _servers.emplace(std::move(gid), std::move(new_grp));

@@ -199,27 +199,12 @@ future<> raft_group_registry::start_server_for_group(server_for_group new_grp) {
on_internal_error(rslog, std::current_exception());
}

- grp.ticker->arm_periodic(tick_interval);
+ grp.ticker->arm_periodic(raft_tick_interval);
}

unsigned raft_group_registry::shard_for_group(const raft::group_id& gid) const {
return 0; // schema raft server is always owned by shard 0
}

-gms::inet_address raft_group_registry::get_inet_address(raft::server_id id) const {
- auto it = _srv_address_mappings.find(id);
- if (!it) {
- on_internal_error(rslog, format("Destination raft server not found with id {}", id));
- }
- return *it;
-}
-
-void raft_group_registry::update_address_mapping(raft::server_id id, gms::inet_address addr, bool expiring) {
- _srv_address_mappings.set(id, std::move(addr), expiring);
-}
-
-void raft_group_registry::remove_address_mapping(raft::server_id id) {
- _srv_address_mappings.erase(id);
-}

} // end of namespace service
diff --git a/service/raft/raft_rpc.cc b/service/raft/raft_rpc.cc
index 774be77eeb..a1d3c26453 100644
--- a/service/raft/raft_rpc.cc
+++ b/service/raft/raft_rpc.cc
@@ -19,7 +19,6 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "service/raft/raft_rpc.hh"
-#include "service/raft/raft_group_registry.hh"
#include "gms/inet_address.hh"
#include "gms/inet_address_serializer.hh"
#include "serializer_impl.hh"
@@ -31,23 +30,23 @@ namespace service {

static seastar::logger rlogger("raft_rpc");

-raft_rpc::raft_rpc(netw::messaging_service& ms, raft_group_registry& raft_gr, raft::group_id gid, raft::server_id srv_id)
- : _group_id(std::move(gid)), _server_id(srv_id), _messaging(ms), _raft_gr(raft_gr)
+raft_rpc::raft_rpc(netw::messaging_service& ms, raft_address_map<>& address_map, raft::group_id gid, raft::server_id srv_id)
+ : _group_id(std::move(gid)), _server_id(srv_id), _messaging(ms), _address_map(address_map)
{}

future<raft::snapshot_reply> raft_rpc::send_snapshot(raft::server_id id, const raft::install_snapshot& snap, seastar::abort_source& as) {
return _messaging.send_raft_snapshot(
- netw::msg_addr(_raft_gr.get_inet_address(id)), db::no_timeout, _group_id, _server_id, id, snap);
+ netw::msg_addr(_address_map.get_inet_address(id)), db::no_timeout, _group_id, _server_id, id, snap);
}

future<> raft_rpc::send_append_entries(raft::server_id id, const raft::append_request& append_request) {
return _messaging.send_raft_append_entries(
- netw::msg_addr(_raft_gr.get_inet_address(id)), db::no_timeout, _group_id, _server_id, id, append_request);
+ netw::msg_addr(_address_map.get_inet_address(id)), db::no_timeout, _group_id, _server_id, id, append_request);
}

void raft_rpc::send_append_entries_reply(raft::server_id id, const raft::append_reply& reply) {
(void)with_gate(_shutdown_gate, [this, id, &reply] {
- return _messaging.send_raft_append_entries_reply(netw::msg_addr(_raft_gr.get_inet_address(id)), timeout(), _group_id, _server_id, id, reply)
+ return _messaging.send_raft_append_entries_reply(netw::msg_addr(_address_map.get_inet_address(id)), timeout(), _group_id, _server_id, id, reply)
.handle_exception([id] (std::exception_ptr ex) {
try {
std::rethrow_exception(ex);
@@ -61,7 +60,7 @@ void raft_rpc::send_append_entries_reply(raft::server_id id, const raft::append_

void raft_rpc::send_vote_request(raft::server_id id, const raft::vote_request& vote_request) {
(void)with_gate(_shutdown_gate, [this, id, vote_request] {
- return _messaging.send_raft_vote_request(netw::msg_addr(_raft_gr.get_inet_address(id)), timeout(), _group_id, _server_id, id, vote_request)
+ return _messaging.send_raft_vote_request(netw::msg_addr(_address_map.get_inet_address(id)), timeout(), _group_id, _server_id, id, vote_request)
.handle_exception([id] (std::exception_ptr ex) {
try {
std::rethrow_exception(ex);
@@ -75,7 +74,7 @@ void raft_rpc::send_vote_request(raft::server_id id, const raft::vote_request& v

void raft_rpc::send_vote_reply(raft::server_id id, const raft::vote_reply& vote_reply) {
(void)with_gate(_shutdown_gate, [this, id, vote_reply] {
- return _messaging.send_raft_vote_reply(netw::msg_addr(_raft_gr.get_inet_address(id)), timeout(), _group_id, _server_id, id, vote_reply)
+ return _messaging.send_raft_vote_reply(netw::msg_addr(_address_map.get_inet_address(id)), timeout(), _group_id, _server_id, id, vote_reply)
.handle_exception([id] (std::exception_ptr ex) {
try {
std::rethrow_exception(ex);
@@ -89,7 +88,7 @@ void raft_rpc::send_vote_reply(raft::server_id id, const raft::vote_reply& vote_

void raft_rpc::send_timeout_now(raft::server_id id, const raft::timeout_now& timeout_now) {
(void)with_gate(_shutdown_gate, [this, id, timeout_now] {
- return _messaging.send_raft_timeout_now(netw::msg_addr(_raft_gr.get_inet_address(id)), timeout(), _group_id, _server_id, id, timeout_now)
+ return _messaging.send_raft_timeout_now(netw::msg_addr(_address_map.get_inet_address(id)), timeout(), _group_id, _server_id, id, timeout_now)
.handle_exception([id] (std::exception_ptr ex) {
try {
std::rethrow_exception(ex);
@@ -106,11 +105,11 @@ void raft_rpc::add_server(raft::server_id id, raft::server_info info) {
auto in = ser::as_input_stream(bytes_view(info));
// Entries explicitly managed via `rpc::add_server` and `rpc::remove_server` should never expire
// while entries learnt upon receiving an rpc message should be expirable.
- _raft_gr.update_address_mapping(id, ser::deserialize(in, boost::type<gms::inet_address>()), false);
+ _address_map.set(id, ser::deserialize(in, boost::type<gms::inet_address>()), false);
}

void raft_rpc::remove_server(raft::server_id id) {
- _raft_gr.remove_address_mapping(id);
+ _address_map.erase(id);
}

future<> raft_rpc::abort() {
--
2.25.1

Konstantin Osipov

<kostja@scylladb.com>
unread,
Jul 24, 2021, 7:13:22 AM7/24/21
to scylladb-dev@googlegroups.com, Konstantin Osipov
Lambdas are captured by reference, so copy the
captured lambda to coroutine frame before yielding.
---
service/raft/raft_sys_table_storage.cc | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/service/raft/raft_sys_table_storage.cc b/service/raft/raft_sys_table_storage.cc
index 2fb4e5c879..866d2e5127 100644
--- a/service/raft/raft_sys_table_storage.cc
+++ b/service/raft/raft_sys_table_storage.cc
@@ -242,9 +242,12 @@ future<> raft_sys_table_storage::truncate_log_tail(raft::index_t idx) {
future<> raft_sys_table_storage::execute_with_linearization_point(std::function<future<>()> f) {
promise<> task_promise;
auto pending_fut = std::exchange(_pending_op_fut, task_promise.get_future());
+ // Capture lambda state on local coroutine frame, see
+ // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=95111
+ auto cmd = std::move(f);
co_await std::move(pending_fut);
try {
- co_await f();
+ co_await cmd();
task_promise.set_value();
} catch (...) {
task_promise.set_exception(std::current_exception());
--
2.25.1

Konstantin Osipov

<kostja@scylladb.com>
unread,
Jul 24, 2021, 7:13:23 AM7/24/21
to scylladb-dev@googlegroups.com, Konstantin Osipov
Operations of adding or removing a node to Raft configuration
are made idempotent: they do nothing if already done, and
they are safe to resume after a failure.

However, since topology changes are not transactional, if a
bootstrap or removal procedure fails midway, Raft group 0
configuration may go out of sync with topology state as seen by
gossip.

In future we must change gossip to avoid making any persistent
changes to the cluster: all changes to persistent topology state
will be done exclusively through Raft Group 0.

Specifically, instead of persisting the tokens by advertising
them through gossip, the bootstrap will commit a change to a system
table using Raft group 0. nodetool will switch from looking at
gossip-managed tables to consulting with Raft Group 0 configuration.
Once this transformation is done, naturally, adding a node to Raft
configuration will become the first persistent change to ring state
applied when a node joins; removing a node from the Raft Group 0
configuration will become the last action when removing a node.

Until this is done, do our best to avoid a cluster state when
a removed node or a node which addition failed is stuck in Raft
configuration, but the node is no longer present in gossip-managed
system tables. In other words, keep the gossip the primary source of
truth. For this purpose, carefully chose the timing when we
join and leave Raft group 0:

Join the Raft group 0 only after we've advertised our tokens, so the
cluster is aware of this node, it's visible in nodetool status,
but before node state jumps to "normal", i.e. before it accepts
queries. Since the operation is idempotent, invoke it on each
restart.

Remove the node from Group 0 *before* its tokens are removed
from gossip-managed system tables. This guarantees
that if removal from Raft group 0 fails for whatever reason,
the node stays in the ring, so nodetool removenode and
friends are re-tried.

While previously raft service was simply a container
for raft groups, now it can perform lengthy, blocking
I/O in a loop. This has to be interruptible, e.g. during
shutdown.

Add tracing
Make group 0 available on shard 0, directly from group registry.

Add RPC stubs for Raft topology changes

Add stubs for Raft RPC calls - leader discovery, raft_add_server and
raft_remove_server - used during topology changes.

Add stub implementations for RPC callbacks
---
configure.py | 2 +
idl/group0.idl.hh | 43 ++++
message/messaging_service.hh | 18 +-
service/raft/messaging.hh | 63 ++++++
service/raft/raft_group0.hh | 88 ++++++++
service/raft/raft_group_registry.hh | 73 +++----
service/storage_service.hh | 6 +-
main.cc | 16 +-
message/messaging_service.cc | 42 ++++
service/raft/raft_group0.cc | 301 ++++++++++++++++++++++++++++
service/raft/raft_group_registry.cc | 154 ++++++++++----
service/storage_service.cc | 52 ++++-
test/boost/gossip_test.cc | 3 +-
test/lib/cql_test_env.cc | 18 +-
test/manual/gossip.cc | 2 +-
15 files changed, 780 insertions(+), 101 deletions(-)
create mode 100644 idl/group0.idl.hh
create mode 100644 service/raft/messaging.hh
create mode 100644 service/raft/raft_group0.hh
create mode 100644 service/raft/raft_group0.cc

diff --git a/configure.py b/configure.py
index a2b31f8857..20a3a619da 100755
--- a/configure.py
+++ b/configure.py
@@ -1000,6 +1000,7 @@ scylla_core = (['database.cc',
'service/raft/raft_gossip_failure_detector.cc',
'service/raft/raft_group_registry.cc',
'service/raft/discovery.cc',
+ 'service/raft/raft_group0.cc',
] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] \
+ scylla_raft_core
)
@@ -1099,6 +1100,7 @@ idls = ['idl/gossip_digest.idl.hh',
'idl/messaging_service.idl.hh',
'idl/paxos.idl.hh',
'idl/raft.idl.hh',
+ 'idl/group0.idl.hh',
'idl/hinted_handoff.idl.hh',
]

diff --git a/idl/group0.idl.hh b/idl/group0.idl.hh
new file mode 100644
index 0000000000..1c17d1779a
--- /dev/null
+++ b/idl/group0.idl.hh
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2021-present ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+namespace std {
+
+struct monostate {};
+
+}
+
+namespace service {
+
+struct raft_leader_info {
+ raft::group_id group0_id;
+ raft::server_address addr;
+};
+
+struct raft_peer_exchange {
+ std::variant<std::monostate, service::raft_leader_info, std::vector<raft::server_address>> info;
+};
+
+struct raft_success_or_bounce {
+ std::optional<raft::server_address> bounce;
+}
+
+} // namespace raft
diff --git a/message/messaging_service.hh b/message/messaging_service.hh
index 55251117ff..84a04c3488 100644
--- a/message/messaging_service.hh
+++ b/message/messaging_service.hh
@@ -41,6 +41,7 @@
#include "cache_temperature.hh"
#include "service/paxos/prepare_response.hh"
#include "raft/raft.hh"
+#include "service/raft/messaging.hh"
#include "db/hints/messages.hh"

#include <list>
@@ -162,7 +163,10 @@ enum class messaging_verb : int32_t {
RAFT_TIMEOUT_NOW = 51,
HINT_SYNC_POINT_CREATE = 52,
HINT_SYNC_POINT_CHECK = 53,
- LAST = 54,
+ RAFT_PEER_EXCHANGE = 54,
+ RAFT_ADD_SERVER = 55,
+ RAFT_REMOVE_SERVER = 56,
+ LAST = 57,
};

} // namespace netw
@@ -593,6 +597,18 @@ class messaging_service : public seastar::async_sharded_service<messaging_servic
future<> unregister_raft_timeout_now();
future<> send_raft_timeout_now(msg_addr id, clock_type::time_point timeout, raft::group_id, raft::server_id from_id, raft::server_id dst_id, const raft::timeout_now& timeout_now);

+ void register_raft_peer_exchange(std::function<future<service::raft_peer_exchange> (const rpc::client_info&, rpc::opt_time_point, std::vector<raft::server_address>)>&& func);
+ future<> unregister_raft_peer_exchange();
+ future<service::raft_peer_exchange> send_raft_peer_exchange(msg_addr id, clock_type::time_point timeout, const std::vector<raft::server_address>& peers);
+
+ void register_raft_add_server(std::function<future<service::raft_success_or_bounce>(const rpc::client_info&, rpc::opt_time_point, raft::group_id gid, raft::server_address addr)>&& func);
+ future<> unregister_raft_add_server();
+ future<service::raft_success_or_bounce> send_raft_add_server(msg_addr id, clock_type::time_point timeout, raft::group_id gid, raft::server_address addr);
+
+ void register_raft_remove_server(std::function<future<service::raft_success_or_bounce>(const rpc::client_info&, rpc::opt_time_point, raft::group_id gid, raft::server_id sid)>&& func);
+ future<> unregister_raft_remove_server();
+ future<service::raft_success_or_bounce> send_raft_remove_server(msg_addr id, clock_type::time_point timeout, raft::group_id gid, raft::server_id sid);
+
void foreach_server_connection_stats(std::function<void(const rpc::client_info&, const rpc::stats&)>&& f) const;
private:
bool remove_rpc_client_one(clients_map& clients, msg_addr id, bool dead_only);
diff --git a/service/raft/messaging.hh b/service/raft/messaging.hh
new file mode 100644
index 0000000000..0e104d6d92
--- /dev/null
+++ b/service/raft/messaging.hh
@@ -0,0 +1,63 @@
+/*
+ * Copyright (C) 2021-present ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <http://www.gnu.org/licenses/>.
+ */
+#pragma once
+#include "raft/raft.hh"
+/////////////////////////////////////////
+// Discovery RPC supporting message types
+
+namespace service {
+
+// Used in a bootstrapped Scylla cluster, provides group 0
+// identifier and the current group leader address.
+struct raft_leader_info {
+ raft::group_id group0_id;
+ raft::server_address addr;
+ bool operator==(const raft_leader_info& rhs) const {
+ return rhs.group0_id == group0_id && rhs.addr == addr;
+ }
+};
+
+// If the peer has no cluster discovery running, it returns
+// no_discovery, which means the caller needs to retry
+// contacting this server after a pause. Otherwise it returns
+// its leader data or a list of peers.
+struct raft_peer_exchange {
+ std::variant<std::monostate, raft_leader_info, std::vector<raft::server_address>> info;
+};
+
+// Raft RPC such as add_entries may succeed or, if the server is
+// not a leader or failed while the operation is in progress,
+// suggest another server. Representation of this type of
+// response. It is legal to return success_or_bounce in cases
+// of uncertainty (operation may have succeeded), if the user
+// decides to retry with a returned bounce address, it must
+// ensure idempotency of the request. The user of
+// success_or_bounce should also be prepared to handle a standard
+// kind of exception, e.g. RPC timeout in which case it can
+// use the same leader address to retry.
+struct raft_success_or_bounce {
+ // Set if the client should retry with another leader.
+ std::optional<raft::server_address> bounce;
+};
+
+/////////////////////////////////////////
+} // end of namespace service
+
diff --git a/service/raft/raft_group0.hh b/service/raft/raft_group0.hh
new file mode 100644
index 0000000000..0dd02cca1f
--- /dev/null
+++ b/service/raft/raft_group0.hh
@@ -0,0 +1,88 @@
+/*
+ * Copyright (C) 2021-present ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <http://www.gnu.org/licenses/>.
+ */
+#pragma once
+#include "service/raft/raft_group_registry.hh"
+#include "service/raft/discovery.hh"
+#include "service/raft/messaging.hh"
+
+namespace cql3 { class query_processor; }
+
+namespace gms { class gossiper; }
+
+namespace service {
+
+class migration_manager;
+
+class raft_group0 {
+public:
+ seastar::gate _shutdown_gate;
+ seastar::abort_source _abort_source;
+ raft_group_registry& _raft_gr;
+ netw::messaging_service& _ms;
+ gms::gossiper& _gossiper;
+ cql3::query_processor& _qp;
+ service::migration_manager& _mm;
+ // Status of leader discovery. Initially there is no group 0,
+ // and the variant contains no state. During initial cluster
+ // bootstrap a discovery object is created, which is then
+ // substituted by group0 id when a leader is discovered or
+ // created.
+ std::variant<std::monostate, service::discovery, raft::group_id> _group0;
+
+ raft_server_for_group create_server_for_group(raft::group_id id, raft::server_address my_addr);
+ future<raft::server_address> load_or_create_my_addr();
+public:
+ raft_group0(service::raft_group_registry& raft_gr,
+ netw::messaging_service& ms,
+ gms::gossiper& gs,
+ cql3::query_processor& qp,
+ migration_manager& mm);
+
+ future<> abort() {
+ _abort_source.request_abort();
+ return _shutdown_gate.close();
+ }
+
+ // A helper function to discover Raft Group 0 leader in
+ // absence of running group 0 server.
+ future<raft_leader_info> discover_leader(raft::server_address my_addr);
+ // A helper to run Raft RPC in a loop handling bounces
+ // and trying different leaders until it succeeds.
+ future<> rpc_until_success(raft::server_address addr, auto rpc);
+
+ // Join this node to the cluster-wide Raft group
+ // Called during bootstrap. Is idempotent - it
+ // does nothing if already done, or resumes from the
+ // unifinished state if aborted. The result is that
+ // raft service has group 0 running.
+ future<> join_raft_group0();
+
+ // Remove the node from the cluster-wide raft group.
+ // This procedure is idempotent. In case of replace node,
+ // it removes the replaced node from the group, since
+ // it can't do it by itself (it's dead).
+ future<> leave_raft_group0();
+
+ // Handle peer_exchange RPC
+ future<raft_peer_exchange> peer_exchange(discovery::peer_list peers);
+};
+
+} // end of namespace service
diff --git a/service/raft/raft_group_registry.hh b/service/raft/raft_group_registry.hh
index 018f67e4f2..b9af1c3ae9 100644
--- a/service/raft/raft_group_registry.hh
+++ b/service/raft/raft_group_registry.hh
@@ -22,24 +22,14 @@

#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>
+#include <seastar/core/gate.hh>

#include "message/messaging_service_fwd.hh"
-#include "gms/inet_address.hh"
#include "raft/raft.hh"
#include "raft/server.hh"
#include "service/raft/raft_address_map.hh"

-namespace cql3 {
-
-class query_processor;
-
-} // namespace cql3
-
-namespace gms {
-
-class gossiper;
-
-} // namespace gms
+namespace gms { class gossiper; }

namespace service {

@@ -68,41 +58,51 @@ struct raft_server_for_group {
// to the owning shard for a given raft group_id.
class raft_group_registry : public seastar::peering_sharded_service<raft_group_registry> {
private:
+ seastar::gate _shutdown_gate;
netw::messaging_service& _ms;
- gms::gossiper& _gossiper;
- sharded<cql3::query_processor>& _qp;
- // Shard-local failure detector instance shared among all raft groups
- shared_ptr<raft_gossip_failure_detector> _fd;
-
// Raft servers along with the corresponding timers to tick each instance.
// Currently ticking every 100ms.
std::unordered_map<raft::group_id, raft_server_for_group> _servers;
// inet_address:es for remote raft servers known to us
raft_address_map<> _srv_address_mappings;
+ // Shard-local failure detector instance shared among all raft groups
+ seastar::shared_ptr<raft_gossip_failure_detector> _fd;

void init_rpc_verbs();
seastar::future<> uninit_rpc_verbs();
seastar::future<> stop_servers();

- raft_server_for_group create_server_for_group(raft::group_id id);
-
raft_server_for_group& server_for_group(raft::group_id id);
-public:

- raft_group_registry(netw::messaging_service& ms, gms::gossiper& gs, sharded<cql3::query_processor>& qp);
- // To integrate Raft service into the boot procedure, the
- // initialization is split into 2 steps:
- // - in sharded::start(), we construct an instance of
- // raft_group_registry on each shard. The RPC is not available
- // yet and no groups are created. The created object is
- // useless but it won't crash on use.
- // - in init(), which must be invoked explicitly on each
- // shard after the query processor and database have started,
- // we boot all existing groups from the local system tables
- // and start RPC
- seastar::future<> init();
- // Must be invoked explicitly on each shard to stop this service.
- seastar::future<> uninit();
+ // Group 0 id, valid only on shard 0 after boot is over
+ std::optional<raft::group_id> _group0_id;
+public:
+ // A helper function to get the current Raft group leader.
+ // Checks if this server is a leader, and if it's the
+ // case, returns its address.
+ // Otherwise one of the two cases are possible:
+ // 1. This is a follower of a stable leader.
+ // Returns the address of this leader.
+ // 2. This server is not a leader and not a follower of an
+ // existing leader. E.g. election is in progress or this server
+ // lost leadership just recently and is a follower with no
+ // leader. Instead of waiting for election to finish, orders
+ // the configuration lexicographically and returns the next
+ // server address. This helps avoid a deadlock if e.g. this
+ // server is partitioned away from the majority. Instead of
+ // waiting for election to finish, perhaps indefinitely, we
+ // switch to another server.
+ //
+ // Internal errors are propagated up unchanged.
+ //
+ future<raft::server_address> guess_leader(raft::group_id gid);
+
+ raft_group_registry(netw::messaging_service& ms, gms::gossiper& gs);
+
+ // Called manually at start
+ seastar::future<> start();
+ // Called by sharded<>::stop()
+ seastar::future<> stop();

raft_rpc& get_rpc(raft::group_id gid);

@@ -110,10 +110,15 @@ class raft_group_registry : public seastar::peering_sharded_service<raft_group_r
// there is no such group.
raft::server& get_server(raft::group_id gid);

+ // Return an instance of group 0. Valid only on shard 0,
+ // after boot is complete
+ raft::server& group0();
+
// Start raft server instance, store in the map of raft servers and
// arm the associated timer to tick the server.
future<> start_server_for_group(raft_server_for_group grp);
unsigned shard_for_group(const raft::group_id& gid) const;
+ shared_ptr<raft_gossip_failure_detector>& failure_detector() { return _fd; }
raft_address_map<>& address_map() { return _srv_address_mappings; }
};

diff --git a/service/storage_service.hh b/service/storage_service.hh
index d47037e595..d2c4fc385e 100644
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -78,6 +78,8 @@ namespace service {
class raft_group_registry;
}

+namespace cql3 { class query_processor; }
+
namespace cql_transport { class controller; }

namespace cdc {
@@ -105,6 +107,7 @@ namespace service {

class storage_service;
class migration_manager;
+class raft_group0;

extern distributed<storage_service> _the_storage_service;
// DEPRECATED, DON'T USE!
@@ -179,6 +182,7 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
gms::gossiper& _gossiper;
// Container for all Raft instances running on this shard.
raft_group_registry& _raft_gr;
+ std::unique_ptr<service::raft_group0> _group0;
sharded<netw::messaging_service>& _messaging;
sharded<service::migration_manager>& _migration_manager;
sharded<repair_service>& _repair;
@@ -432,7 +436,7 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
*
* \see init_messaging_service_part
*/
- future<> init_server(bind_messaging_port do_bind = bind_messaging_port::yes);
+ future<> init_server(cql3::query_processor& qp, bind_messaging_port do_bind = bind_messaging_port::yes);

future<> join_cluster();

diff --git a/main.cc b/main.cc
index 04757e9b84..5bcda33d2a 100644
--- a/main.cc
+++ b/main.cc
@@ -869,11 +869,16 @@ int main(int ac, char** av) {
gossiper.start(std::ref(stop_signal.as_sharded_abort_source()), std::ref(feature_service), std::ref(token_metadata), std::ref(messaging), std::ref(*cfg), std::ref(gcfg)).get();
// #293 - do not stop anything
//engine().at_exit([]{ return gms::get_gossiper().stop(); });
- supervisor::notify("starting Raft service");
- raft_gr.start(std::ref(messaging), std::ref(gossiper), std::ref(qp)).get();
+ supervisor::notify("starting Raft Group Registry service");
+ raft_gr.start(std::ref(messaging), std::ref(gossiper)).get();
+ // XXX: stop_raft has to happen before query_processor
+ // is stopped, since some groups keep using the query
+ // processor until are stopped inside stop_raft.
auto stop_raft = defer_verbose_shutdown("Raft", [&raft_gr] {
raft_gr.stop().get();
});
+ raft_gr.invoke_on_all(&service::raft_group_registry::start).get();
+
supervisor::notify("initializing storage service");
service::storage_service_config sscfg;
sscfg.available_memory = memory::stats().total_memory();
@@ -1103,11 +1108,6 @@ int main(int ac, char** av) {
auto stop_proxy_handlers = defer_verbose_shutdown("storage proxy RPC verbs", [&proxy] {
proxy.invoke_on_all(&service::storage_proxy::uninit_messaging_service).get();
});
- supervisor::notify("starting Raft RPC");
- raft_gr.invoke_on_all(&service::raft_group_registry::init).get();
- auto stop_raft_rpc = defer_verbose_shutdown("Raft RPC", [&raft_gr] {
- raft_gr.invoke_on_all(&service::raft_group_registry::uninit).get();
- });
supervisor::notify("starting streaming service");
streaming::stream_session::init_streaming_service(db, sys_dist_ks, view_update_generator, messaging, mm).get();
auto stop_streaming_service = defer_verbose_shutdown("streaming service", [] {
@@ -1217,7 +1217,7 @@ int main(int ac, char** av) {
});

with_scheduling_group(maintenance_scheduling_group, [&] {
- return ss.init_server();
+ return ss.init_server(qp.local());
}).get();

sst_format_selector.sync();
diff --git a/message/messaging_service.cc b/message/messaging_service.cc
index 51dcdff492..602c7dd248 100644
--- a/message/messaging_service.cc
+++ b/message/messaging_service.cc
@@ -67,6 +67,7 @@
#include "idl/paxos.dist.hh"
#include "idl/raft.dist.hh"
#include "idl/hinted_handoff.dist.hh"
+#include "idl/group0.dist.hh"
#include "serializer_impl.hh"
#include "serialization_visitors.hh"
#include "idl/consistency_level.dist.impl.hh"
@@ -90,6 +91,7 @@
#include "idl/messaging_service.dist.impl.hh"
#include "idl/paxos.dist.impl.hh"
#include "idl/raft.dist.impl.hh"
+#include "idl/group0.dist.impl.hh"
#include "idl/hinted_handoff.dist.impl.hh"
#include <seastar/rpc/lz4_compressor.hh>
#include <seastar/rpc/lz4_fragmented_compressor.hh>
@@ -547,6 +549,11 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::GOSSIP_ECHO:
case messaging_verb::GOSSIP_GET_ENDPOINT_STATES:
case messaging_verb::GET_SCHEMA_VERSION:
+ // Raft peer exchange is mainly running at boot, but still
+ // should not be blocked by any data requests.
+ case messaging_verb::RAFT_PEER_EXCHANGE:
+ case messaging_verb::RAFT_ADD_SERVER:
+ case messaging_verb::RAFT_REMOVE_SERVER:
return 0;
case messaging_verb::PREPARE_MESSAGE:
case messaging_verb::PREPARE_DONE_MESSAGE:
@@ -1582,6 +1589,41 @@ future<db::hints::sync_point_check_response> messaging_service::send_hint_sync_p
return send_message_timeout<future<db::hints::sync_point_check_response>>(this, messaging_verb::HINT_SYNC_POINT_CHECK, std::move(id), timeout, std::move(request));
}

+void messaging_service::register_raft_peer_exchange(std::function<future<service::raft_peer_exchange>(const rpc::client_info&, rpc::opt_time_point, std::vector<raft::server_address>)>&& func) {
+ register_handler(this, netw::messaging_verb::RAFT_PEER_EXCHANGE, std::move(func));
+}
+future<> messaging_service::unregister_raft_peer_exchange() {
+ return unregister_handler(netw::messaging_verb::RAFT_PEER_EXCHANGE);
+}
+future<service::raft_peer_exchange> messaging_service::send_raft_peer_exchange(msg_addr id, clock_type::time_point timeout, const std::vector<raft::server_address>& peers) {
+ return send_message_timeout<service::raft_peer_exchange>(this, messaging_verb::RAFT_PEER_EXCHANGE, std::move(id), timeout, peers);
+}
+
+void messaging_service::register_raft_add_server(std::function<future<service::raft_success_or_bounce>(const rpc::client_info&, rpc::opt_time_point, raft::group_id gid, raft::server_address addr)>&& func) {
+ register_handler(this, netw::messaging_verb::RAFT_ADD_SERVER, std::move(func));
+}
+
+future<> messaging_service::unregister_raft_add_server() {
+ return unregister_handler(netw::messaging_verb::RAFT_ADD_SERVER);
+}
+
+future<service::raft_success_or_bounce> messaging_service::send_raft_add_server(msg_addr id, clock_type::time_point timeout, raft::group_id gid, raft::server_address addr) {
+ return send_message_timeout<service::raft_success_or_bounce>(this, messaging_verb::RAFT_ADD_SERVER, std::move(id), timeout, gid, addr);
+}
+
+void messaging_service::register_raft_remove_server(std::function<future<service::raft_success_or_bounce>(const rpc::client_info&, rpc::opt_time_point, raft::group_id gid, raft::server_id sid)>&& func) {
+ register_handler(this, netw::messaging_verb::RAFT_REMOVE_SERVER, std::move(func));
+}
+
+future<> messaging_service::unregister_raft_remove_server() {
+ return unregister_handler(netw::messaging_verb::RAFT_REMOVE_SERVER);
+}
+
+future<service::raft_success_or_bounce> messaging_service::send_raft_remove_server(msg_addr id, clock_type::time_point timeout, raft::group_id gid, raft::server_id sid) {
+ return send_message_timeout<service::raft_success_or_bounce>(this, messaging_verb::RAFT_REMOVE_SERVER, std::move(id), timeout, gid, sid);
+}
+
+
void init_messaging_service(sharded<messaging_service>& ms,
messaging_service::config mscfg, netw::messaging_service::scheduling_config scfg,
sstring ms_trust_store, sstring ms_cert, sstring ms_key, sstring ms_tls_prio, bool ms_client_auth) {
diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc
new file mode 100644
index 0000000000..d68c538d21
--- /dev/null
+++ b/service/raft/raft_group0.cc
@@ -0,0 +1,301 @@
+/*
+ * Copyright (C) 2021-present ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include "service/raft/raft_group0.hh"
+#include "service/raft/raft_rpc.hh"
+#include "service/raft/raft_gossip_failure_detector.hh"
+#include "service/raft/raft_sys_table_storage.hh"
+#include "service/raft/schema_raft_state_machine.hh"
+
+#include "message/messaging_service.hh"
+#include "cql3/query_processor.hh"
+#include "gms/gossiper.hh"
+#include "db/system_keyspace.hh"
+
+#include <seastar/core/smp.hh>
+#include <seastar/core/sleep.hh>
+#include <seastar/core/coroutine.hh>
+#include <seastar/util/log.hh>
+#include <seastar/util/defer.hh>
+
+namespace service {
+
+raft_group0::raft_group0(raft_group_registry& raft_gr,
+ netw::messaging_service& ms,
+ gms::gossiper& gs,
+ cql3::query_processor& qp,
+ service::migration_manager& mm)
+ : _raft_gr(raft_gr), _ms(ms), _gossiper(gs), _qp(qp), _mm(mm)
+{
+}
+
+seastar::future<raft::server_address> raft_group0::load_or_create_my_addr() {
+ assert(this_shard_id() == 0);
+ raft::server_address my_addr;
+ my_addr.id = raft::server_id{co_await db::system_keyspace::get_raft_server_id()};
+ if (my_addr.id == raft::server_id{}) {
+ my_addr.id = raft::server_id::create_random_id();
+ co_await db::system_keyspace::set_raft_server_id(my_addr.id.id);
+ }
+ my_addr.info = ser::serialize_to_buffer<bytes>(_gossiper.get_broadcast_address());
+ co_return my_addr;
+}
+
+raft_server_for_group raft_group0::create_server_for_group(raft::group_id gid,
+ raft::server_address my_addr) {
+
+ _raft_gr.address_map().set(my_addr.id,
+ ser::deserialize_from_buffer(my_addr.info, boost::type<gms::inet_address>{}),
+ false);
+ auto rpc = std::make_unique<raft_rpc>(_ms, _raft_gr.address_map(), gid, my_addr.id);
+ // Keep a reference to a specific RPC class.
+ auto& rpc_ref = *rpc;
+ auto storage = std::make_unique<raft_sys_table_storage>(_qp, gid);
+ auto state_machine = std::make_unique<schema_raft_state_machine>();
+ auto server = raft::create_server(my_addr.id, std::move(rpc), std::move(state_machine),
+ std::move(storage), _raft_gr.failure_detector(), raft::server::configuration());
+
+ // initialize the corresponding timer to tick the raft server instance
+ auto ticker = std::make_unique<raft_ticker_type>([srv = server.get()] { srv->tick(); });
+
+ return raft_server_for_group{
+ .gid = std::move(gid),
+ .server = std::move(server),
+ .ticker = std::move(ticker),
+ .rpc = rpc_ref,
+ };
+}
+
+future<raft_leader_info>
+raft_group0::discover_leader(raft::server_address my_addr) {
+ std::vector<raft::server_address> seeds;
+ seeds.reserve(_gossiper.get_seeds().size());
+ for (auto& seed : _gossiper.get_seeds()) {
+ if (seed == _gossiper.get_broadcast_address()) {
+ continue;
+ }
+ seeds.push_back({.info = ser::serialize_to_buffer<bytes>(seed)});
+ }
+ _group0 = discovery{my_addr, std::move(seeds)};
+ service::discovery& discovery = std::get<service::discovery>(_group0);
+ auto clear_discovery = defer([this] {
+ _group0 = std::monostate{};
+ });
+
+ struct tracker {
+ explicit tracker(discovery::output output_arg) : output(std::move(output_arg)) {}
+ discovery::output output;
+ promise<std::optional<raft_leader_info>> leader;
+ bool is_set = false;
+ void set_value(std::optional<raft_leader_info> opt_leader) {
+ is_set = true;
+ leader.set_value(std::move(opt_leader));
+ }
+ void set_exception() {
+ is_set = true;
+ leader.set_exception(std::current_exception());
+ }
+ };
+ // Send peer information to all known peers. If replies
+ // discover new peers, send peer information to them as well.
+ // As soon as we get a leader information form any peer,
+ // return it. If there is no chosen leader, collect replies
+ // from all peers, then make this node the leader if it has
+ // the smallest id. Otherwise sleep and keep pinging peers
+ // till some other node becomes a leader and shares its leader_info
+ // with us.
+ while (true) {
+ auto tracker = make_lw_shared<struct tracker>(discovery.get_output());
+ if (std::holds_alternative<discovery::i_am_leader>(tracker->output)) {
+ co_return raft_leader_info{.addr = my_addr};
+ }
+ if (std::holds_alternative<discovery::pause>(tracker->output)) {
+ co_await seastar::sleep_abortable(std::chrono::milliseconds(100), _abort_source);
+ continue;
+ }
+ auto& request_list = std::get<discovery::request_list>(tracker->output);
+ auto timeout = db::timeout_clock::now() + std::chrono::milliseconds{100};
+ (void) parallel_for_each(request_list, [this, tracker, timeout, &discovery] (
+ std::pair<raft::server_address, discovery::peer_list>& req) -> future<> {
+ netw::msg_addr peer(ser::deserialize_from_buffer(req.first.info,
+ boost::type<gms::inet_address>{}));
+ co_await with_gate(_shutdown_gate,
+ [this, tracker, &discovery, peer, timeout, from = std::move(req.first), msg = std::move(req.second)] () -> future<> {
+
+ rslog.info("Sending discovery message to {}", peer);
+ auto reply = co_await _ms.send_raft_peer_exchange(peer, timeout, std::move(msg));
+ // Check if this loop iteration has completed already
+ // before accessing discovery, which may be gone.
+ if (tracker->is_set) {
+ co_return;
+ }
+ if (std::holds_alternative<discovery::peer_list>(reply.info)) {
+ discovery.response(from, std::move(std::get<discovery::peer_list>(reply.info)));
+ } else if (std::holds_alternative<raft_leader_info>(reply.info)) {
+ tracker->set_value(std::move(std::get<raft_leader_info>(reply.info)));
+ }
+ });
+ }).then_wrapped([tracker] (future<> f) -> future<> {
+ // When the leader is discovered, silence all
+ // errors.
+ if (tracker->is_set) {
+ co_return;
+ }
+ // Silence all runtime errors, such as host
+ // unreachable. Propagate rpc and system errors up.
+ try {
+ co_await std::move(f);
+ tracker->set_value({});
+ } catch (std::exception& e) {
+ if (dynamic_cast<std::runtime_error*>(&e) == nullptr) {
+ tracker->set_exception();
+ } else {
+ rslog.debug("discovery failed to send message: {}", e);
+ tracker->set_value({});
+ }
+ }
+ });
+ if (auto leader = co_await tracker->leader.get_future()) {
+ co_return *leader;
+ }
+ }
+}
+
+// Handle bounce response from the peer and redirect RPC
+// to the bounced peer, until success.
+// XXX handle shutdown
+future<> raft_group0::rpc_until_success(raft::server_address addr, auto rpc) {
+ while (true) {
+ auto success_or_bounce = co_await with_gate(_shutdown_gate, [this, &addr, &rpc] () ->
+ future<raft_success_or_bounce> {
+ auto timeout = db::timeout_clock::now() + std::chrono::milliseconds{1000};
+ netw::msg_addr peer(ser::deserialize_from_buffer(addr.info, boost::type<gms::inet_address>{}));
+ try {
+ co_return co_await rpc(peer, timeout);
+ } catch (std::exception& e) {
+ if (dynamic_cast<std::runtime_error*>(&e) != nullptr) {
+ // Re-try with the same peer.
+ co_return raft_success_or_bounce{.bounce = addr};
+ } else {
+ throw;
+ }
+ }
+ });
+ if (!success_or_bounce.bounce) {
+ break;
+ }
+ addr = *success_or_bounce.bounce;
+ co_await seastar::sleep_abortable(std::chrono::milliseconds(100), _abort_source);
+ }
+}
+
+future<> raft_group0::join_raft_group0() {
+ assert(this_shard_id() == 0);
+ auto my_addr = co_await load_or_create_my_addr();
+ raft::group_id group0_id = raft::group_id{co_await db::system_keyspace::get_raft_group0_id()};
+ if (group0_id != raft::group_id{}) {
+ co_await _raft_gr.start_server_for_group(create_server_for_group(group0_id, my_addr));
+ _group0 = group0_id;
+ co_return;
+ }
+ raft::server_address leader;
+
+ auto li = co_await discover_leader(my_addr);
+ group0_id = li.group0_id;
+ leader = li.addr;
+
+ raft::configuration initial_configuration;
+ if (leader.id == my_addr.id) {
+ // Time-based ordering for groups identifiers may be
+ // useful to provide linearisability between group
+ // operations. Currently it's unused.
+ group0_id = raft::group_id{utils::UUID_gen::get_time_UUID()};
+ initial_configuration.current.emplace(my_addr);
+ }
+ auto grp = create_server_for_group(group0_id, my_addr);
+ co_await grp.server->bootstrap(std::move(initial_configuration));
+ co_await _raft_gr.start_server_for_group(std::move(grp));
+ if (leader.id != my_addr.id) {
+ co_await rpc_until_success(leader, [this, group0_id, my_addr] (auto peer, auto timeout)
+ -> future<raft_success_or_bounce> {
+ rslog.info("Sending add node RPC to {}", peer);
+ co_return co_await _ms.send_raft_add_server(peer, timeout, group0_id, my_addr);
+ });
+ }
+ co_await db::system_keyspace::set_raft_group0_id(group0_id.id);
+ // Allow peer_exchange() RPC to access group 0 only after group0_id is persisted.
+ _group0 = group0_id;
+}
+
+future<> raft_group0::leave_raft_group0() {
+ assert(this_shard_id() == 0);
+ raft::server_address my_addr;
+ my_addr.id = raft::server_id{co_await db::system_keyspace::get_raft_server_id()};
+ if (my_addr.id == raft::server_id{}) {
+ // Nothing to do
+ co_return;
+ }
+ my_addr.info = ser::serialize_to_buffer<bytes>(_gossiper.get_broadcast_address());
+ raft::group_id group0_id;
+ raft::server_address leader;
+ if (std::holds_alternative<raft::group_id>(_group0)) {
+ group0_id = std::get<raft::group_id>(_group0);
+ } else {
+ group0_id = raft::group_id{co_await db::system_keyspace::get_raft_group0_id()};
+ if (group0_id == raft::group_id{}) {
+ auto li = co_await discover_leader(my_addr);
+ group0_id = li.group0_id;
+ leader = li.addr;
+ if (leader.id == my_addr.id) {
+ co_return;
+ }
+ }
+ co_await _raft_gr.start_server_for_group(create_server_for_group(group0_id, my_addr));
+ }
+ leader = co_await _raft_gr.guess_leader(group0_id);
+ co_await rpc_until_success(leader, [this, group0_id, my_addr] (auto peer, auto timeout)
+ -> future<raft_success_or_bounce> {
+ co_return co_await _ms.send_raft_remove_server(peer, timeout, group0_id, my_addr.id);
+ });
+}
+
+future<raft_peer_exchange> raft_group0::peer_exchange(discovery::peer_list peers) {
+ return std::visit([this, peers = std::move(peers)] (auto&& d) -> future<raft_peer_exchange> {
+ using T = std::decay_t<decltype(d)>;
+ if constexpr (std::is_same_v<T, std::monostate>) {
+ // Discovery not started or we're persisting the
+ // leader information locally.
+ co_return raft_peer_exchange{std::monostate{}};
+ } else if constexpr (std::is_same_v<T, discovery>) {
+ // Use discovery to produce a response
+ co_return raft_peer_exchange{d.request(std::move(peers))};
+ } else if constexpr (std::is_same_v<T, raft::group_id>) {
+ // Obtain leader information from existing Group 0 server.
+ auto addr = co_await _raft_gr.guess_leader(std::get<raft::group_id>(_group0));
+ co_return raft_peer_exchange{raft_leader_info{
+ .group0_id = std::get<raft::group_id>(_group0),
+ .addr = addr
+ }};
+ }
+ }, _group0);
+}
+
+} // end of namespace service
+
diff --git a/service/raft/raft_group_registry.cc b/service/raft/raft_group_registry.cc
index 685e23941c..08ce811ec8 100644
--- a/service/raft/raft_group_registry.cc
+++ b/service/raft/raft_group_registry.cc
@@ -20,28 +20,20 @@
*/
#include "service/raft/raft_group_registry.hh"
#include "service/raft/raft_rpc.hh"
-#include "service/raft/raft_sys_table_storage.hh"
-#include "service/raft/schema_raft_state_machine.hh"
#include "service/raft/raft_gossip_failure_detector.hh"
-
#include "message/messaging_service.hh"
-#include "cql3/query_processor.hh"
-#include "gms/gossiper.hh"
+#include "serializer_impl.hh"
+

-#include <seastar/core/smp.hh>
#include <seastar/core/coroutine.hh>
-#include <seastar/core/on_internal_error.hh>
-#include <seastar/util/log.hh>
-#include <seastar/util/defer.hh>

namespace service {

logging::logger rslog("raft_group_registry");

-raft_group_registry::raft_group_registry(netw::messaging_service& ms, gms::gossiper& gs, sharded<cql3::query_processor>& qp)
- : _ms(ms), _gossiper(gs), _qp(qp), _fd(make_shared<raft_gossip_failure_detector>(gs, _srv_address_mappings))
+raft_group_registry::raft_group_registry(netw::messaging_service& ms, gms::gossiper& gossiper)
+ : _ms(ms), _fd(make_shared<raft_gossip_failure_detector>(gossiper, _srv_address_mappings))
{
- (void) _gossiper;
}

void raft_group_registry::init_rpc_verbs() {
@@ -107,6 +99,56 @@ void raft_group_registry::init_rpc_verbs() {
return make_ready_future<>();
});
});
+
+ auto cas_or_bounce = [this](raft::group_id gid, auto cas) -> future<raft_success_or_bounce> {
+ raft::server& server = get_server(gid);
+ try {
+ co_await server.cas(cas);
+ co_return raft_success_or_bounce{};
+ } catch (const std::exception& e) {
+ if (! raft::is_transient_error(e)) {
+ throw;
+ }
+ }
+ auto bounce = co_await guess_leader(gid);
+ co_return raft_success_or_bounce{.bounce = bounce};
+ };
+
+ auto raft_add_server_impl = [this, cas_or_bounce] (const rpc::client_info& cinfo,
+ rpc::opt_time_point timeout, raft::group_id gid, raft::server_address addr)
+ -> future<raft_success_or_bounce> {
+
+ return container().invoke_on(shard_for_group(gid), [gid, addr, cas_or_bounce] (
+ raft_group_registry& self) -> future<raft_success_or_bounce> {
+
+ return cas_or_bounce(gid, [addr] (const raft::server& server) -> future<raft::server_address_set> {
+ rslog.info("Adding node to Raft Group 0 {}",
+ ser::deserialize_from_buffer(addr.info, boost::type<gms::inet_address>{}));
+ auto configuration = server.get_configuration().current;
+ configuration.emplace(std::move(addr));
+ co_return configuration;
+ });
+ });
+ };
+
+ _ms.register_raft_add_server(raft_add_server_impl);
+
+ auto raft_remove_server_impl = [this, cas_or_bounce] (const rpc::client_info& cinfo,
+ rpc::opt_time_point timeout, raft::group_id gid, raft::server_id sid) -> future<raft_success_or_bounce> {
+
+ return container().invoke_on(shard_for_group(gid), [gid, sid, cas_or_bounce] (
+ raft_group_registry& self) -> future<raft_success_or_bounce> {
+
+ return cas_or_bounce(gid, [sid] (const raft::server& server) -> future<raft::server_address_set> {
+ auto configuration = server.get_configuration().current;
+ rslog.info("Removing node to Raft Group 0 {}", sid);
+ configuration.erase(raft::server_address{.id = sid});
+ co_return configuration;
+ });
+ });
+ };
+
+ _ms.register_raft_remove_server(raft_remove_server_impl);
}

future<> raft_group_registry::uninit_rpc_verbs() {
@@ -116,7 +158,9 @@ future<> raft_group_registry::uninit_rpc_verbs() {
_ms.unregister_raft_append_entries_reply(),
_ms.unregister_raft_vote_request(),
_ms.unregister_raft_vote_reply(),
- _ms.unregister_raft_timeout_now()
+ _ms.unregister_raft_timeout_now(),
+ _ms.unregister_raft_add_server(),
+ _ms.unregister_raft_remove_server()
).discard_result();
}

@@ -129,17 +173,19 @@ future<> raft_group_registry::stop_servers() {
co_await when_all_succeed(stop_futures.begin(), stop_futures.end());
}

-seastar::future<> raft_group_registry::init() {
+seastar::future<> raft_group_registry::start() {
// Once a Raft server starts, it soon times out
// and starts an election, so RPC must be ready by
// then to send VoteRequest messages.
co_return init_rpc_verbs();
}

-seastar::future<> raft_group_registry::uninit() {
- return uninit_rpc_verbs().then([this] {
- return stop_servers();
- });
+seastar::future<> raft_group_registry::stop() {
+ co_await when_all_succeed(
+ _shutdown_gate.close(),
+ uninit_rpc_verbs(),
+ stop_servers()
+ ).discard_result();
}

raft_server_for_group& raft_group_registry::server_for_group(raft::group_id gid) {
@@ -158,26 +204,8 @@ raft::server& raft_group_registry::get_server(raft::group_id gid) {
return *(server_for_group(gid).server);
}

-raft_server_for_group raft_group_registry::create_server_for_group(raft::group_id gid) {
-
- raft::server_id my_id = raft::server_id::create_random_id();
- auto rpc = std::make_unique<raft_rpc>(_ms, _srv_address_mappings, gid, my_id);
- // Keep a reference to a specific RPC class.
- auto& rpc_ref = *rpc;
- auto storage = std::make_unique<raft_sys_table_storage>(_qp.local(), gid);
- auto state_machine = std::make_unique<schema_raft_state_machine>();
- auto server = raft::create_server(my_id, std::move(rpc), std::move(state_machine),
- std::move(storage), _fd, raft::server::configuration());
-
- // initialize the corresponding timer to tick the raft server instance
- auto ticker = std::make_unique<raft_ticker_type>([srv = server.get()] { srv->tick(); });
-
- return raft_server_for_group{
- .gid = std::move(gid),
- .server = std::move(server),
- .ticker = std::move(ticker),
- .rpc = rpc_ref,
- };
+raft::server& raft_group_registry::group0() {
+ return *(server_for_group(*_group0_id).server);
}

future<> raft_group_registry::start_server_for_group(raft_server_for_group new_grp) {
@@ -187,6 +215,9 @@ future<> raft_group_registry::start_server_for_group(raft_server_for_group new_g
if (!inserted) {
on_internal_error(rslog, format("Attempt to add the second instance of raft server with the same gid={}", gid));
}
+ if (_servers.size() == 1 && this_shard_id() == 0) {
+ _group0_id = gid;
+ }
auto& grp = it->second;
try {
// start the server instance prior to arming the ticker timer.
@@ -206,5 +237,52 @@ unsigned raft_group_registry::shard_for_group(const raft::group_id& gid) const {
return 0; // schema raft server is always owned by shard 0
}

+future<raft::server_address>
+raft_group_registry::guess_leader(raft::group_id gid) {
+ raft::server& server = get_server(gid);
+ co_return co_await with_gate(_shutdown_gate, [this, &server] () -> future<raft::server_address> {
+ try {
+ co_await server.read_barrier();
+ co_return _srv_address_mappings.get_server_address(server.id());
+ } catch (std::exception& e) {
+ if (auto n_a_l = dynamic_cast<raft::not_a_leader*>(&e)) {
+ if (n_a_l->leader != raft::server_id{}) {
+ auto it = _srv_address_mappings.find(n_a_l->leader);
+ if (it) {
+ auto addr = raft::server_address{
+ .id = n_a_l->leader,
+ .info = ser::serialize_to_buffer<bytes>(*it)
+ };
+ co_return addr;
+ }
+ }
+ // Fall through into the round-robin mode if there is
+ // no mapping for this raft server id or of there is
+ // no stable leader.
+ } else if (! raft::is_transient_error(e)) {
+ throw;
+ }
+ // We're in a state of uncertainty, an election is
+ // in progress.
+ std::set<raft::server_address> sorted(
+ server.get_configuration().current.begin(),
+ server.get_configuration().current.end());
+ assert(!sorted.empty());
+ auto it = sorted.find(raft::server_address{.id = server.id()});
+ // The group is put into registry on this server only
+ // after a majority commits the configuration change,
+ // but not necessarily this server was part of this
+ // majority. Hence, when round-robining over the list
+ // of servers we may arrive to a server which doesn't
+ // yet have itself as part of the config. In that case
+ // skip over to the first server which is part of the
+ // config.
+ if (it == sorted.end() || ++it == sorted.end()) {
+ co_return *sorted.begin();
+ }
+ co_return *it;
+ }
+ });
+}

} // end of namespace service
diff --git a/service/storage_service.cc b/service/storage_service.cc
index 4f27641455..b5fa6b096a 100644
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -48,6 +48,7 @@
#include "log.hh"
#include "service/migration_manager.hh"
#include "service/storage_proxy.hh"
+#include "service/raft/raft_group0.hh"
#include "to_string.hh"
#include "gms/gossiper.hh"
#include "gms/failure_detector.hh"
@@ -143,7 +144,6 @@ storage_service::storage_service(abort_source& abort_source,
if (snitch.local_is_initialized()) {
_listeners.emplace_back(make_lw_shared(snitch.local()->when_reconfigured(_snitch_reconfigure)));
}
- (void) _raft_gr;
}

void storage_service::enable_all_features() {
@@ -638,6 +638,15 @@ void storage_service::join_token_ring(int delay) {

// start participating in the ring.
set_gossip_tokens(_gossiper, _bootstrap_tokens, _cdc_gen_id);
+
+ // Until topology tables are managed by Raft, Gossip continues to be
+ // the primary source of truth, so we should initialize Raft
+ // once gossip tokens have been persisted, so that if
+ // initialization fails, the node is visible to removenode
+ // which can be used to clean this node's state from the
+ // cluster.
+ _group0->join_raft_group0().get();
+
set_mode(mode::NORMAL, "node is now in normal status", true);

if (get_token_metadata().sorted_tokens().empty()) {
@@ -737,6 +746,7 @@ void storage_service::bootstrap() {
slogger.debug("Removing replaced endpoint {} from system.peers", *replace_addr);
db::system_keyspace::remove_endpoint(*replace_addr).get();
}
+ _group0->leave_raft_group0().get();
}

_db.invoke_on_all([this] (database& db) {
@@ -1394,12 +1404,15 @@ future<> storage_service::uninit_messaging_service_part() {
return container().invoke_on_all(&service::storage_service::uninit_messaging_service);
}

-future<> storage_service::init_server(bind_messaging_port do_bind) {
+future<> storage_service::init_server(cql3::query_processor& qp, bind_messaging_port do_bind) {
assert(this_shard_id() == 0);

- return seastar::async([this, do_bind] {
+ return seastar::async([this, &qp, do_bind] {
_initialized = true;

+ _group0 = std::make_unique<raft_group0>(_raft_gr, _messaging.local(), _gossiper, qp,
+ _migration_manager.local());
+
std::unordered_set<inet_address> loaded_endpoints;
if (_db.local().get_config().load_ring_state()) {
slogger.info("Loading persisted ring state");
@@ -1506,9 +1519,13 @@ future<> storage_service::stop() {
// make sure nobody uses the semaphore
node_ops_singal_abort(std::nullopt);
_listeners.clear();
- return _schema_version_publisher.join().finally([this] {
- return std::move(_node_ops_abort_thread);
- });
+ future<> group0_f =_group0 ? _group0->abort() : make_ready_future<>();
+ return when_all_succeed(
+ _schema_version_publisher.join().finally([this] {
+ return std::move(_node_ops_abort_thread);
+ }),
+ std::move(group0_f)
+ ).discard_result();
}

future<> storage_service::check_for_endpoint_collision(std::unordered_set<gms::inet_address> initial_contact_nodes, const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features, bind_messaging_port do_bind) {
@@ -2371,6 +2388,7 @@ future<> storage_service::removenode(sstring host_id_string, std::list<gms::inet


// Step 5: Announce the node has left
+ ss._group0->leave_raft_group0().get();
ss._gossiper.advertise_token_removed(endpoint, host_id).get();
std::unordered_set<token> tmp(tokens.begin(), tokens.end());
ss.excise(std::move(tmp), endpoint);
@@ -3605,10 +3623,29 @@ void storage_service::init_messaging_service() {
_messaging.local().register_replication_finished([] (gms::inet_address from) {
return get_local_storage_service().confirm_replication(from);
});
+
+ auto peer_exchange_impl = [this](const rpc::client_info& cinfo, rpc::opt_time_point timeout,
+ discovery::peer_list peers) -> future<raft_peer_exchange> {
+
+ return container().invoke_on(0 /* group 0 is on shard 0 */, [peers = std::move(peers)] (
+ storage_service& self) -> future<raft_peer_exchange> {
+
+ if (self._group0) {
+ return self._group0->peer_exchange(std::move(peers));
+ } else {
+ return make_ready_future<raft_peer_exchange>(raft_peer_exchange{std::monostate{}});
+ }
+ });
+ };
+
+ _messaging.local().register_raft_peer_exchange(peer_exchange_impl);
}

future<> storage_service::uninit_messaging_service() {
- return _messaging.local().unregister_replication_finished();
+ return when_all_succeed(
+ _messaging.local().unregister_replication_finished(),
+ _messaging.local().unregister_raft_peer_exchange()
+ ).discard_result();
}

void storage_service::do_isolate_on_error(disk_error type)
@@ -3673,6 +3710,7 @@ future<> storage_service::force_remove_completion() {
slogger.warn("No host_id is found for endpoint {}", endpoint);
continue;
}
+ ss._group0->leave_raft_group0().get();
ss._gossiper.advertise_token_removed(endpoint, host_id).get();
std::unordered_set<token> tokens_set(tokens.begin(), tokens.end());
ss.excise(tokens_set, endpoint);
diff --git a/test/boost/gossip_test.cc b/test/boost/gossip_test.cc
index cb5da57a1b..3eb5ac836e 100644
--- a/test/boost/gossip_test.cc
+++ b/test/boost/gossip_test.cc
@@ -90,8 +90,9 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
service::storage_service_config sscfg;
sscfg.available_memory = memory::stats().total_memory();

- raft_gr.start(std::ref(_messaging), std::ref(gms::get_gossiper()), std::ref(qp)).get();
+ raft_gr.start(std::ref(_messaging), std::ref(gms::get_gossiper())).get();
auto stop_raft = defer([&raft_gr] { raft_gr.stop().get(); });
+ raft_gr.invoke_on_all(&service::raft_group_registry::start).get();

service::get_storage_service().start(std::ref(abort_sources),
std::ref(db), std::ref(gms::get_gossiper()),
diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc
index dbb88e4e43..d90ce162a0 100644
--- a/test/lib/cql_test_env.cc
+++ b/test/lib/cql_test_env.cc
@@ -535,8 +535,8 @@ class single_node_cql_env : public cql_test_env {
sharded<repair_service> repair;
sharded<cql3::query_processor> qp;
sharded<service::raft_group_registry> raft_gr;
- raft_gr.start(std::ref(ms), std::ref(gms::get_gossiper()), std::ref(qp)).get();
- auto stop_raft = defer([&raft_gr] { raft_gr.stop().get(); });
+ raft_gr.start(std::ref(ms), std::ref(gms::get_gossiper())).get();
+ raft_gr.invoke_on_all(&service::raft_group_registry::start).get();

auto& ss = service::get_storage_service();
service::storage_service_config sscfg;
@@ -655,19 +655,17 @@ class single_node_cql_env : public cql_test_env {
auto stop_database_d = defer([&db] {
stop_database(db).get();
});
+ // XXX: stop_raft before stopping the database and
+ // query processor. Group registry stop raft groups
+ // when stopped, and until then the groups may use
+ // the database and the query processor.
+ auto stop_raft = defer([&raft_gr] { raft_gr.stop().get(); });

db::system_keyspace::init_local_cache().get();
auto stop_local_cache = defer([] { db::system_keyspace::deinit_local_cache().get(); });

sys_dist_ks.start(std::ref(qp), std::ref(mm), std::ref(proxy)).get();

- // We need to have a system keyspace started and
- // initialized to initialize Raft service.
- raft_gr.invoke_on_all(&service::raft_group_registry::init).get();
- auto stop_raft_rpc = defer([&raft_gr] {
- raft_gr.invoke_on_all(&service::raft_group_registry::uninit).get();
- });
-
cdc_generation_service.start(std::ref(*cfg), std::ref(gms::get_gossiper()), std::ref(sys_dist_ks), std::ref(abort_sources), std::ref(token_metadata), std::ref(feature_service)).get();
auto stop_cdc_generation_service = defer([&cdc_generation_service] {
cdc_generation_service.stop().get();
@@ -680,7 +678,7 @@ class single_node_cql_env : public cql_test_env {
cdc.stop().get();
});

- service::get_local_storage_service().init_server(service::bind_messaging_port(false)).get();
+ service::get_local_storage_service().init_server(qp.local(), service::bind_messaging_port(false)).get();
service::get_local_storage_service().join_cluster().get();

auth::permissions_cache_config perm_cache_config;
diff --git a/test/manual/gossip.cc b/test/manual/gossip.cc
index aceb2484d1..cd54aab091 100644
--- a/test/manual/gossip.cc
+++ b/test/manual/gossip.cc
@@ -96,7 +96,7 @@ int main(int ac, char ** av) {
messaging.start(listen).get();
gms::get_gossiper().start(std::ref(abort_sources), std::ref(feature_service), std::ref(token_metadata), std::ref(messaging), std::ref(*cfg)).get();

- raft_gr.start(std::ref(messaging), std::ref(gms::get_gossiper()), std::ref(qp)).get();
+ raft_gr.start(std::ref(messaging), std::ref(gms::get_gossiper())).get();
auto stop_raft = defer([&raft_gr] { raft_gr.stop().get(); });

service::init_storage_service(std::ref(abort_sources),
--
2.25.1

Reply all
Reply to author
Forward
0 new messages