Operations of adding or removing a node to Raft configuration
are made idempotent: they do nothing if already done, and
they are safe to resume after a failure.
However, since topology changes are not transactional, if a
bootstrap or removal procedure fails midway, Raft group 0
configuration may go out of sync with topology state as seen by
gossip.
In future we must change gossip to avoid making any persistent
changes to the cluster: all changes to persistent topology state
will be done exclusively through Raft Group 0.
Specifically, instead of persisting the tokens by advertising
them through gossip, the bootstrap will commit a change to a system
table using Raft group 0. nodetool will switch from looking at
gossip-managed tables to consulting with Raft Group 0 configuration.
Once this transformation is done, naturally, adding a node to Raft
configuration will become the first persistent change to ring state
applied when a node joins; removing a node from the Raft Group 0
configuration will become the last action when removing a node.
Until this is done, do our best to avoid a cluster state when
a removed node or a node which addition failed is stuck in Raft
configuration, but the node is no longer present in gossip-managed
system tables. In other words, keep the gossip the primary source of
truth. For this purpose, carefully chose the timing when we
join and leave Raft group 0:
Join the Raft group 0 only after we've advertised our tokens, so the
cluster is aware of this node, it's visible in nodetool status,
but before node state jumps to "normal", i.e. before it accepts
queries. Since the operation is idempotent, invoke it on each
restart.
Remove the node from Group 0 *before* its tokens are removed
from gossip-managed system tables. This guarantees
that if removal from Raft group 0 fails for whatever reason,
the node stays in the ring, so nodetool removenode and
friends are re-tried.
While previously raft service was simply a container
for raft groups, now it can perform lengthy, blocking
I/O in a loop. This has to be interruptible, e.g. during
shutdown.
Add tracing
Make group 0 available on shard 0, directly from group registry.
Add RPC stubs for Raft topology changes
Add stubs for Raft RPC calls - leader discovery, raft_add_server and
raft_remove_server - used during topology changes.
Add stub implementations for RPC callbacks
---
configure.py | 2 +
idl/group0.idl.hh | 43 ++++
message/messaging_service.hh | 18 +-
service/raft/messaging.hh | 63 ++++++
service/raft/raft_group0.hh | 88 ++++++++
service/raft/raft_group_registry.hh | 73 +++----
service/storage_service.hh | 6 +-
main.cc | 16 +-
message/messaging_service.cc | 42 ++++
service/raft/raft_group0.cc | 301 ++++++++++++++++++++++++++++
service/raft/raft_group_registry.cc | 154 ++++++++++----
service/storage_service.cc | 52 ++++-
test/boost/gossip_test.cc | 3 +-
test/lib/cql_test_env.cc | 18 +-
test/manual/gossip.cc | 2 +-
15 files changed, 780 insertions(+), 101 deletions(-)
create mode 100644 idl/group0.idl.hh
create mode 100644 service/raft/messaging.hh
create mode 100644 service/raft/raft_group0.hh
create mode 100644 service/raft/raft_group0.cc
diff --git a/configure.py b/configure.py
index a2b31f8857..20a3a619da 100755
--- a/configure.py
+++ b/configure.py
@@ -1000,6 +1000,7 @@ scylla_core = (['database.cc',
'service/raft/raft_gossip_failure_detector.cc',
'service/raft/raft_group_registry.cc',
'service/raft/discovery.cc',
+ 'service/raft/raft_group0.cc',
] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] \
+ scylla_raft_core
)
@@ -1099,6 +1100,7 @@ idls = ['idl/gossip_digest.idl.hh',
'idl/messaging_service.idl.hh',
'idl/paxos.idl.hh',
'idl/raft.idl.hh',
+ 'idl/group0.idl.hh',
'idl/hinted_handoff.idl.hh',
]
diff --git a/idl/group0.idl.hh b/idl/group0.idl.hh
new file mode 100644
index 0000000000..1c17d1779a
--- /dev/null
+++ b/idl/group0.idl.hh
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2021-present ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <
http://www.gnu.org/licenses/>.
+ */
+
+namespace std {
+
+struct monostate {};
+
+}
+
+namespace service {
+
+struct raft_leader_info {
+ raft::group_id group0_id;
+ raft::server_address addr;
+};
+
+struct raft_peer_exchange {
+ std::variant<std::monostate, service::raft_leader_info, std::vector<raft::server_address>> info;
+};
+
+struct raft_success_or_bounce {
+ std::optional<raft::server_address> bounce;
+}
+
+} // namespace raft
diff --git a/message/messaging_service.hh b/message/messaging_service.hh
index 55251117ff..84a04c3488 100644
--- a/message/messaging_service.hh
+++ b/message/messaging_service.hh
@@ -41,6 +41,7 @@
#include "cache_temperature.hh"
#include "service/paxos/prepare_response.hh"
#include "raft/raft.hh"
+#include "service/raft/messaging.hh"
#include "db/hints/messages.hh"
#include <list>
@@ -162,7 +163,10 @@ enum class messaging_verb : int32_t {
RAFT_TIMEOUT_NOW = 51,
HINT_SYNC_POINT_CREATE = 52,
HINT_SYNC_POINT_CHECK = 53,
- LAST = 54,
+ RAFT_PEER_EXCHANGE = 54,
+ RAFT_ADD_SERVER = 55,
+ RAFT_REMOVE_SERVER = 56,
+ LAST = 57,
};
} // namespace netw
@@ -593,6 +597,18 @@ class messaging_service : public seastar::async_sharded_service<messaging_servic
future<> unregister_raft_timeout_now();
future<> send_raft_timeout_now(msg_addr id, clock_type::time_point timeout, raft::group_id, raft::server_id from_id, raft::server_id dst_id, const raft::timeout_now& timeout_now);
+ void register_raft_peer_exchange(std::function<future<service::raft_peer_exchange> (const rpc::client_info&, rpc::opt_time_point, std::vector<raft::server_address>)>&& func);
+ future<> unregister_raft_peer_exchange();
+ future<service::raft_peer_exchange> send_raft_peer_exchange(msg_addr id, clock_type::time_point timeout, const std::vector<raft::server_address>& peers);
+
+ void register_raft_add_server(std::function<future<service::raft_success_or_bounce>(const rpc::client_info&, rpc::opt_time_point, raft::group_id gid, raft::server_address addr)>&& func);
+ future<> unregister_raft_add_server();
+ future<service::raft_success_or_bounce> send_raft_add_server(msg_addr id, clock_type::time_point timeout, raft::group_id gid, raft::server_address addr);
+
+ void register_raft_remove_server(std::function<future<service::raft_success_or_bounce>(const rpc::client_info&, rpc::opt_time_point, raft::group_id gid, raft::server_id sid)>&& func);
+ future<> unregister_raft_remove_server();
+ future<service::raft_success_or_bounce> send_raft_remove_server(msg_addr id, clock_type::time_point timeout, raft::group_id gid, raft::server_id sid);
+
void foreach_server_connection_stats(std::function<void(const rpc::client_info&, const rpc::stats&)>&& f) const;
private:
bool remove_rpc_client_one(clients_map& clients, msg_addr id, bool dead_only);
diff --git a/service/raft/messaging.hh b/service/raft/messaging.hh
new file mode 100644
index 0000000000..0e104d6d92
--- /dev/null
+++ b/service/raft/messaging.hh
@@ -0,0 +1,63 @@
+/*
+ * Copyright (C) 2021-present ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <
http://www.gnu.org/licenses/>.
+ */
+#pragma once
+#include "raft/raft.hh"
+/////////////////////////////////////////
+// Discovery RPC supporting message types
+
+namespace service {
+
+// Used in a bootstrapped Scylla cluster, provides group 0
+// identifier and the current group leader address.
+struct raft_leader_info {
+ raft::group_id group0_id;
+ raft::server_address addr;
+ bool operator==(const raft_leader_info& rhs) const {
+ return rhs.group0_id == group0_id && rhs.addr == addr;
+ }
+};
+
+// If the peer has no cluster discovery running, it returns
+// no_discovery, which means the caller needs to retry
+// contacting this server after a pause. Otherwise it returns
+// its leader data or a list of peers.
+struct raft_peer_exchange {
+ std::variant<std::monostate, raft_leader_info, std::vector<raft::server_address>> info;
+};
+
+// Raft RPC such as add_entries may succeed or, if the server is
+// not a leader or failed while the operation is in progress,
+// suggest another server. Representation of this type of
+// response. It is legal to return success_or_bounce in cases
+// of uncertainty (operation may have succeeded), if the user
+// decides to retry with a returned bounce address, it must
+// ensure idempotency of the request. The user of
+// success_or_bounce should also be prepared to handle a standard
+// kind of exception, e.g. RPC timeout in which case it can
+// use the same leader address to retry.
+struct raft_success_or_bounce {
+ // Set if the client should retry with another leader.
+ std::optional<raft::server_address> bounce;
+};
+
+/////////////////////////////////////////
+} // end of namespace service
+
diff --git a/service/raft/raft_group0.hh b/service/raft/raft_group0.hh
new file mode 100644
index 0000000000..0dd02cca1f
--- /dev/null
+++ b/service/raft/raft_group0.hh
@@ -0,0 +1,88 @@
+/*
+ * Copyright (C) 2021-present ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <
http://www.gnu.org/licenses/>.
+ */
+#pragma once
+#include "service/raft/raft_group_registry.hh"
+#include "service/raft/discovery.hh"
+#include "service/raft/messaging.hh"
+
+namespace cql3 { class query_processor; }
+
+namespace gms { class gossiper; }
+
+namespace service {
+
+class migration_manager;
+
+class raft_group0 {
+public:
+ seastar::gate _shutdown_gate;
+ seastar::abort_source _abort_source;
+ raft_group_registry& _raft_gr;
+ netw::messaging_service& _ms;
+ gms::gossiper& _gossiper;
+ cql3::query_processor& _qp;
+ service::migration_manager& _mm;
+ // Status of leader discovery. Initially there is no group 0,
+ // and the variant contains no state. During initial cluster
+ // bootstrap a discovery object is created, which is then
+ // substituted by group0 id when a leader is discovered or
+ // created.
+ std::variant<std::monostate, service::discovery, raft::group_id> _group0;
+
+ raft_server_for_group create_server_for_group(raft::group_id id, raft::server_address my_addr);
+ future<raft::server_address> load_or_create_my_addr();
+public:
+ raft_group0(service::raft_group_registry& raft_gr,
+ netw::messaging_service& ms,
+ gms::gossiper& gs,
+ cql3::query_processor& qp,
+ migration_manager& mm);
+
+ future<> abort() {
+ _abort_source.request_abort();
+ return _shutdown_gate.close();
+ }
+
+ // A helper function to discover Raft Group 0 leader in
+ // absence of running group 0 server.
+ future<raft_leader_info> discover_leader(raft::server_address my_addr);
+ // A helper to run Raft RPC in a loop handling bounces
+ // and trying different leaders until it succeeds.
+ future<> rpc_until_success(raft::server_address addr, auto rpc);
+
+ // Join this node to the cluster-wide Raft group
+ // Called during bootstrap. Is idempotent - it
+ // does nothing if already done, or resumes from the
+ // unifinished state if aborted. The result is that
+ // raft service has group 0 running.
+ future<> join_raft_group0();
+
+ // Remove the node from the cluster-wide raft group.
+ // This procedure is idempotent. In case of replace node,
+ // it removes the replaced node from the group, since
+ // it can't do it by itself (it's dead).
+ future<> leave_raft_group0();
+
+ // Handle peer_exchange RPC
+ future<raft_peer_exchange> peer_exchange(discovery::peer_list peers);
+};
+
+} // end of namespace service
diff --git a/service/raft/raft_group_registry.hh b/service/raft/raft_group_registry.hh
index 018f67e4f2..b9af1c3ae9 100644
--- a/service/raft/raft_group_registry.hh
+++ b/service/raft/raft_group_registry.hh
@@ -22,24 +22,14 @@
#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>
+#include <seastar/core/gate.hh>
#include "message/messaging_service_fwd.hh"
-#include "gms/inet_address.hh"
#include "raft/raft.hh"
#include "raft/server.hh"
#include "service/raft/raft_address_map.hh"
-namespace cql3 {
-
-class query_processor;
-
-} // namespace cql3
-
-namespace gms {
-
-class gossiper;
-
-} // namespace gms
+namespace gms { class gossiper; }
namespace service {
@@ -68,41 +58,51 @@ struct raft_server_for_group {
// to the owning shard for a given raft group_id.
class raft_group_registry : public seastar::peering_sharded_service<raft_group_registry> {
private:
+ seastar::gate _shutdown_gate;
netw::messaging_service& _ms;
- gms::gossiper& _gossiper;
- sharded<cql3::query_processor>& _qp;
- // Shard-local failure detector instance shared among all raft groups
- shared_ptr<raft_gossip_failure_detector> _fd;
-
// Raft servers along with the corresponding timers to tick each instance.
// Currently ticking every 100ms.
std::unordered_map<raft::group_id, raft_server_for_group> _servers;
// inet_address:es for remote raft servers known to us
raft_address_map<> _srv_address_mappings;
+ // Shard-local failure detector instance shared among all raft groups
+ seastar::shared_ptr<raft_gossip_failure_detector> _fd;
void init_rpc_verbs();
seastar::future<> uninit_rpc_verbs();
seastar::future<> stop_servers();
- raft_server_for_group create_server_for_group(raft::group_id id);
-
raft_server_for_group& server_for_group(raft::group_id id);
-public:
- raft_group_registry(netw::messaging_service& ms, gms::gossiper& gs, sharded<cql3::query_processor>& qp);
- // To integrate Raft service into the boot procedure, the
- // initialization is split into 2 steps:
- // - in sharded::start(), we construct an instance of
- // raft_group_registry on each shard. The RPC is not available
- // yet and no groups are created. The created object is
- // useless but it won't crash on use.
- // - in init(), which must be invoked explicitly on each
- // shard after the query processor and database have started,
- // we boot all existing groups from the local system tables
- // and start RPC
- seastar::future<> init();
- // Must be invoked explicitly on each shard to stop this service.
- seastar::future<> uninit();
+ // Group 0 id, valid only on shard 0 after boot is over
+ std::optional<raft::group_id> _group0_id;
+public:
+ // A helper function to get the current Raft group leader.
+ // Checks if this server is a leader, and if it's the
+ // case, returns its address.
+ // Otherwise one of the two cases are possible:
+ // 1. This is a follower of a stable leader.
+ // Returns the address of this leader.
+ // 2. This server is not a leader and not a follower of an
+ // existing leader. E.g. election is in progress or this server
+ // lost leadership just recently and is a follower with no
+ // leader. Instead of waiting for election to finish, orders
+ // the configuration lexicographically and returns the next
+ // server address. This helps avoid a deadlock if e.g. this
+ // server is partitioned away from the majority. Instead of
+ // waiting for election to finish, perhaps indefinitely, we
+ // switch to another server.
+ //
+ // Internal errors are propagated up unchanged.
+ //
+ future<raft::server_address> guess_leader(raft::group_id gid);
+
+ raft_group_registry(netw::messaging_service& ms, gms::gossiper& gs);
+
+ // Called manually at start
+ seastar::future<> start();
+ // Called by sharded<>::stop()
+ seastar::future<> stop();
raft_rpc& get_rpc(raft::group_id gid);
@@ -110,10 +110,15 @@ class raft_group_registry : public seastar::peering_sharded_service<raft_group_r
// there is no such group.
raft::server& get_server(raft::group_id gid);
+ // Return an instance of group 0. Valid only on shard 0,
+ // after boot is complete
+ raft::server& group0();
+
// Start raft server instance, store in the map of raft servers and
// arm the associated timer to tick the server.
future<> start_server_for_group(raft_server_for_group grp);
unsigned shard_for_group(const raft::group_id& gid) const;
+ shared_ptr<raft_gossip_failure_detector>& failure_detector() { return _fd; }
raft_address_map<>& address_map() { return _srv_address_mappings; }
};
diff --git a/service/storage_service.hh b/service/storage_service.hh
index d47037e595..d2c4fc385e 100644
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -78,6 +78,8 @@ namespace service {
class raft_group_registry;
}
+namespace cql3 { class query_processor; }
+
namespace cql_transport { class controller; }
namespace cdc {
@@ -105,6 +107,7 @@ namespace service {
class storage_service;
class migration_manager;
+class raft_group0;
extern distributed<storage_service> _the_storage_service;
// DEPRECATED, DON'T USE!
@@ -179,6 +182,7 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
gms::gossiper& _gossiper;
// Container for all Raft instances running on this shard.
raft_group_registry& _raft_gr;
+ std::unique_ptr<service::raft_group0> _group0;
sharded<netw::messaging_service>& _messaging;
sharded<service::migration_manager>& _migration_manager;
sharded<repair_service>& _repair;
@@ -432,7 +436,7 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
*
* \see init_messaging_service_part
*/
- future<> init_server(bind_messaging_port do_bind = bind_messaging_port::yes);
+ future<> init_server(cql3::query_processor& qp, bind_messaging_port do_bind = bind_messaging_port::yes);
future<> join_cluster();
diff --git a/main.cc b/main.cc
index 04757e9b84..5bcda33d2a 100644
--- a/main.cc
+++ b/main.cc
@@ -869,11 +869,16 @@ int main(int ac, char** av) {
gossiper.start(std::ref(stop_signal.as_sharded_abort_source()), std::ref(feature_service), std::ref(token_metadata), std::ref(messaging), std::ref(*cfg), std::ref(gcfg)).get();
// #293 - do not stop anything
//engine().at_exit([]{ return gms::get_gossiper().stop(); });
- supervisor::notify("starting Raft service");
- raft_gr.start(std::ref(messaging), std::ref(gossiper), std::ref(qp)).get();
+ supervisor::notify("starting Raft Group Registry service");
+ raft_gr.start(std::ref(messaging), std::ref(gossiper)).get();
+ // XXX: stop_raft has to happen before query_processor
+ // is stopped, since some groups keep using the query
+ // processor until are stopped inside stop_raft.
auto stop_raft = defer_verbose_shutdown("Raft", [&raft_gr] {
raft_gr.stop().get();
});
+ raft_gr.invoke_on_all(&service::raft_group_registry::start).get();
+
supervisor::notify("initializing storage service");
service::storage_service_config sscfg;
sscfg.available_memory = memory::stats().total_memory();
@@ -1103,11 +1108,6 @@ int main(int ac, char** av) {
auto stop_proxy_handlers = defer_verbose_shutdown("storage proxy RPC verbs", [&proxy] {
proxy.invoke_on_all(&service::storage_proxy::uninit_messaging_service).get();
});
- supervisor::notify("starting Raft RPC");
- raft_gr.invoke_on_all(&service::raft_group_registry::init).get();
- auto stop_raft_rpc = defer_verbose_shutdown("Raft RPC", [&raft_gr] {
- raft_gr.invoke_on_all(&service::raft_group_registry::uninit).get();
- });
supervisor::notify("starting streaming service");
streaming::stream_session::init_streaming_service(db, sys_dist_ks, view_update_generator, messaging, mm).get();
auto stop_streaming_service = defer_verbose_shutdown("streaming service", [] {
@@ -1217,7 +1217,7 @@ int main(int ac, char** av) {
});
with_scheduling_group(maintenance_scheduling_group, [&] {
- return ss.init_server();
+ return ss.init_server(qp.local());
}).get();
sst_format_selector.sync();
diff --git a/message/messaging_service.cc b/message/messaging_service.cc
index 51dcdff492..602c7dd248 100644
--- a/message/messaging_service.cc
+++ b/message/messaging_service.cc
@@ -67,6 +67,7 @@
#include "idl/paxos.dist.hh"
#include "idl/raft.dist.hh"
#include "idl/hinted_handoff.dist.hh"
+#include "idl/group0.dist.hh"
#include "serializer_impl.hh"
#include "serialization_visitors.hh"
#include "idl/consistency_level.dist.impl.hh"
@@ -90,6 +91,7 @@
#include "idl/messaging_service.dist.impl.hh"
#include "idl/paxos.dist.impl.hh"
#include "idl/raft.dist.impl.hh"
+#include "idl/group0.dist.impl.hh"
#include "idl/hinted_handoff.dist.impl.hh"
#include <seastar/rpc/lz4_compressor.hh>
#include <seastar/rpc/lz4_fragmented_compressor.hh>
@@ -547,6 +549,11 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::GOSSIP_ECHO:
case messaging_verb::GOSSIP_GET_ENDPOINT_STATES:
case messaging_verb::GET_SCHEMA_VERSION:
+ // Raft peer exchange is mainly running at boot, but still
+ // should not be blocked by any data requests.
+ case messaging_verb::RAFT_PEER_EXCHANGE:
+ case messaging_verb::RAFT_ADD_SERVER:
+ case messaging_verb::RAFT_REMOVE_SERVER:
return 0;
case messaging_verb::PREPARE_MESSAGE:
case messaging_verb::PREPARE_DONE_MESSAGE:
@@ -1582,6 +1589,41 @@ future<db::hints::sync_point_check_response> messaging_service::send_hint_sync_p
return send_message_timeout<future<db::hints::sync_point_check_response>>(this, messaging_verb::HINT_SYNC_POINT_CHECK, std::move(id), timeout, std::move(request));
}
+void messaging_service::register_raft_peer_exchange(std::function<future<service::raft_peer_exchange>(const rpc::client_info&, rpc::opt_time_point, std::vector<raft::server_address>)>&& func) {
+ register_handler(this, netw::messaging_verb::RAFT_PEER_EXCHANGE, std::move(func));
+}
+future<> messaging_service::unregister_raft_peer_exchange() {
+ return unregister_handler(netw::messaging_verb::RAFT_PEER_EXCHANGE);
+}
+future<service::raft_peer_exchange> messaging_service::send_raft_peer_exchange(msg_addr id, clock_type::time_point timeout, const std::vector<raft::server_address>& peers) {
+ return send_message_timeout<service::raft_peer_exchange>(this, messaging_verb::RAFT_PEER_EXCHANGE, std::move(id), timeout, peers);
+}
+
+void messaging_service::register_raft_add_server(std::function<future<service::raft_success_or_bounce>(const rpc::client_info&, rpc::opt_time_point, raft::group_id gid, raft::server_address addr)>&& func) {
+ register_handler(this, netw::messaging_verb::RAFT_ADD_SERVER, std::move(func));
+}
+
+future<> messaging_service::unregister_raft_add_server() {
+ return unregister_handler(netw::messaging_verb::RAFT_ADD_SERVER);
+}
+
+future<service::raft_success_or_bounce> messaging_service::send_raft_add_server(msg_addr id, clock_type::time_point timeout, raft::group_id gid, raft::server_address addr) {
+ return send_message_timeout<service::raft_success_or_bounce>(this, messaging_verb::RAFT_ADD_SERVER, std::move(id), timeout, gid, addr);
+}
+
+void messaging_service::register_raft_remove_server(std::function<future<service::raft_success_or_bounce>(const rpc::client_info&, rpc::opt_time_point, raft::group_id gid, raft::server_id sid)>&& func) {
+ register_handler(this, netw::messaging_verb::RAFT_REMOVE_SERVER, std::move(func));
+}
+
+future<> messaging_service::unregister_raft_remove_server() {
+ return unregister_handler(netw::messaging_verb::RAFT_REMOVE_SERVER);
+}
+
+future<service::raft_success_or_bounce> messaging_service::send_raft_remove_server(msg_addr id, clock_type::time_point timeout, raft::group_id gid, raft::server_id sid) {
+ return send_message_timeout<service::raft_success_or_bounce>(this, messaging_verb::RAFT_REMOVE_SERVER, std::move(id), timeout, gid, sid);
+}
+
+
void init_messaging_service(sharded<messaging_service>& ms,
messaging_service::config mscfg, netw::messaging_service::scheduling_config scfg,
sstring ms_trust_store, sstring ms_cert, sstring ms_key, sstring ms_tls_prio, bool ms_client_auth) {
diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc
new file mode 100644
index 0000000000..d68c538d21
--- /dev/null
+++ b/service/raft/raft_group0.cc
@@ -0,0 +1,301 @@
+/*
+ * Copyright (C) 2021-present ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <
http://www.gnu.org/licenses/>.
+ */
+#include "service/raft/raft_group0.hh"
+#include "service/raft/raft_rpc.hh"
+#include "service/raft/raft_gossip_failure_detector.hh"
+#include "service/raft/raft_sys_table_storage.hh"
+#include "service/raft/schema_raft_state_machine.hh"
+
+#include "message/messaging_service.hh"
+#include "cql3/query_processor.hh"
+#include "gms/gossiper.hh"
+#include "db/system_keyspace.hh"
+
+#include <seastar/core/smp.hh>
+#include <seastar/core/sleep.hh>
+#include <seastar/core/coroutine.hh>
+#include <seastar/util/log.hh>
+#include <seastar/util/defer.hh>
+
+namespace service {
+
+raft_group0::raft_group0(raft_group_registry& raft_gr,
+ netw::messaging_service& ms,
+ gms::gossiper& gs,
+ cql3::query_processor& qp,
+ service::migration_manager& mm)
+ : _raft_gr(raft_gr), _ms(ms), _gossiper(gs), _qp(qp), _mm(mm)
+{
+}
+
+seastar::future<raft::server_address> raft_group0::load_or_create_my_addr() {
+ assert(this_shard_id() == 0);
+ raft::server_address my_addr;
+
my_addr.id = raft::server_id{co_await db::system_keyspace::get_raft_server_id()};
+ if (
my_addr.id == raft::server_id{}) {
+
my_addr.id = raft::server_id::create_random_id();
+ co_await db::system_keyspace::set_raft_server_id(
my_addr.id.id);
+ }
+
my_addr.info = ser::serialize_to_buffer<bytes>(_gossiper.get_broadcast_address());
+ co_return my_addr;
+}
+
+raft_server_for_group raft_group0::create_server_for_group(raft::group_id gid,
+ raft::server_address my_addr) {
+
+ _raft_gr.address_map().set(
my_addr.id,
+ ser::deserialize_from_buffer(
my_addr.info, boost::type<gms::inet_address>{}),
+ false);
+ auto rpc = std::make_unique<raft_rpc>(_ms, _raft_gr.address_map(), gid,
my_addr.id);
+ // Keep a reference to a specific RPC class.
+ auto& rpc_ref = *rpc;
+ auto storage = std::make_unique<raft_sys_table_storage>(_qp, gid);
+ auto state_machine = std::make_unique<schema_raft_state_machine>();
+ auto server = raft::create_server(
my_addr.id, std::move(rpc), std::move(state_machine),
+ std::move(storage), _raft_gr.failure_detector(), raft::server::configuration());
+
+ // initialize the corresponding timer to tick the raft server instance
+ auto ticker = std::make_unique<raft_ticker_type>([srv = server.get()] { srv->tick(); });
+
+ return raft_server_for_group{
+ .gid = std::move(gid),
+ .server = std::move(server),
+ .ticker = std::move(ticker),
+ .rpc = rpc_ref,
+ };
+}
+
+future<raft_leader_info>
+raft_group0::discover_leader(raft::server_address my_addr) {
+ std::vector<raft::server_address> seeds;
+ seeds.reserve(_gossiper.get_seeds().size());
+ for (auto& seed : _gossiper.get_seeds()) {
+ if (seed == _gossiper.get_broadcast_address()) {
+ continue;
+ }
+ seeds.push_back({.info = ser::serialize_to_buffer<bytes>(seed)});
+ }
+ _group0 = discovery{my_addr, std::move(seeds)};
+ service::discovery& discovery = std::get<service::discovery>(_group0);
+ auto clear_discovery = defer([this] {
+ _group0 = std::monostate{};
+ });
+
+ struct tracker {
+ explicit tracker(discovery::output output_arg) : output(std::move(output_arg)) {}
+ discovery::output output;
+ promise<std::optional<raft_leader_info>> leader;
+ bool is_set = false;
+ void set_value(std::optional<raft_leader_info> opt_leader) {
+ is_set = true;
+ leader.set_value(std::move(opt_leader));
+ }
+ void set_exception() {
+ is_set = true;
+ leader.set_exception(std::current_exception());
+ }
+ };
+ // Send peer information to all known peers. If replies
+ // discover new peers, send peer information to them as well.
+ // As soon as we get a leader information form any peer,
+ // return it. If there is no chosen leader, collect replies
+ // from all peers, then make this node the leader if it has
+ // the smallest id. Otherwise sleep and keep pinging peers
+ // till some other node becomes a leader and shares its leader_info
+ // with us.
+ while (true) {
+ auto tracker = make_lw_shared<struct tracker>(discovery.get_output());
+ if (std::holds_alternative<discovery::i_am_leader>(tracker->output)) {
+ co_return raft_leader_info{.addr = my_addr};
+ }
+ if (std::holds_alternative<discovery::pause>(tracker->output)) {
+ co_await seastar::sleep_abortable(std::chrono::milliseconds(100), _abort_source);
+ continue;
+ }
+ auto& request_list = std::get<discovery::request_list>(tracker->output);
+ auto timeout = db::timeout_clock::now() + std::chrono::milliseconds{100};
+ (void) parallel_for_each(request_list, [this, tracker, timeout, &discovery] (
+ std::pair<raft::server_address, discovery::peer_list>& req) -> future<> {
+ netw::msg_addr peer(ser::deserialize_from_buffer(
req.first.info,
+ boost::type<gms::inet_address>{}));
+ co_await with_gate(_shutdown_gate,
+ [this, tracker, &discovery, peer, timeout, from = std::move(req.first), msg = std::move(req.second)] () -> future<> {
+
+
rslog.info("Sending discovery message to {}", peer);
+ auto reply = co_await _ms.send_raft_peer_exchange(peer, timeout, std::move(msg));
+ // Check if this loop iteration has completed already
+ // before accessing discovery, which may be gone.
+ if (tracker->is_set) {
+ co_return;
+ }
+ if (std::holds_alternative<discovery::peer_list>(
reply.info)) {
+ discovery.response(from, std::move(std::get<discovery::peer_list>(
reply.info)));
+ } else if (std::holds_alternative<raft_leader_info>(
reply.info)) {
+ tracker->set_value(std::move(std::get<raft_leader_info>(
reply.info)));
+ }
+ });
+ }).then_wrapped([tracker] (future<> f) -> future<> {
+ // When the leader is discovered, silence all
+ // errors.
+ if (tracker->is_set) {
+ co_return;
+ }
+ // Silence all runtime errors, such as host
+ // unreachable. Propagate rpc and system errors up.
+ try {
+ co_await std::move(f);
+ tracker->set_value({});
+ } catch (std::exception& e) {
+ if (dynamic_cast<std::runtime_error*>(&e) == nullptr) {
+ tracker->set_exception();
+ } else {
+ rslog.debug("discovery failed to send message: {}", e);
+ tracker->set_value({});
+ }
+ }
+ });
+ if (auto leader = co_await tracker->leader.get_future()) {
+ co_return *leader;
+ }
+ }
+}
+
+// Handle bounce response from the peer and redirect RPC
+// to the bounced peer, until success.
+// XXX handle shutdown
+future<> raft_group0::rpc_until_success(raft::server_address addr, auto rpc) {
+ while (true) {
+ auto success_or_bounce = co_await with_gate(_shutdown_gate, [this, &addr, &rpc] () ->
+ future<raft_success_or_bounce> {
+ auto timeout = db::timeout_clock::now() + std::chrono::milliseconds{1000};
+ netw::msg_addr peer(ser::deserialize_from_buffer(
addr.info, boost::type<gms::inet_address>{}));
+ try {
+ co_return co_await rpc(peer, timeout);
+ } catch (std::exception& e) {
+ if (dynamic_cast<std::runtime_error*>(&e) != nullptr) {
+ // Re-try with the same peer.
+ co_return raft_success_or_bounce{.bounce = addr};
+ } else {
+ throw;
+ }
+ }
+ });
+ if (!success_or_bounce.bounce) {
+ break;
+ }
+ addr = *success_or_bounce.bounce;
+ co_await seastar::sleep_abortable(std::chrono::milliseconds(100), _abort_source);
+ }
+}
+
+future<> raft_group0::join_raft_group0() {
+ assert(this_shard_id() == 0);
+ auto my_addr = co_await load_or_create_my_addr();
+ raft::group_id group0_id = raft::group_id{co_await db::system_keyspace::get_raft_group0_id()};
+ if (group0_id != raft::group_id{}) {
+ co_await _raft_gr.start_server_for_group(create_server_for_group(group0_id, my_addr));
+ _group0 = group0_id;
+ co_return;
+ }
+ raft::server_address leader;
+
+ auto li = co_await discover_leader(my_addr);
+ group0_id = li.group0_id;
+ leader = li.addr;
+
+ raft::configuration initial_configuration;
+ if (
leader.id ==
my_addr.id) {
+ // Time-based ordering for groups identifiers may be
+ // useful to provide linearisability between group
+ // operations. Currently it's unused.
+ group0_id = raft::group_id{utils::UUID_gen::get_time_UUID()};
+ initial_configuration.current.emplace(my_addr);
+ }
+ auto grp = create_server_for_group(group0_id, my_addr);
+ co_await grp.server->bootstrap(std::move(initial_configuration));
+ co_await _raft_gr.start_server_for_group(std::move(grp));
+ if (
leader.id !=
my_addr.id) {
+ co_await rpc_until_success(leader, [this, group0_id, my_addr] (auto peer, auto timeout)
+ -> future<raft_success_or_bounce> {
+
rslog.info("Sending add node RPC to {}", peer);
+ co_return co_await _ms.send_raft_add_server(peer, timeout, group0_id, my_addr);
+ });
+ }
+ co_await db::system_keyspace::set_raft_group0_id(
group0_id.id);
+ // Allow peer_exchange() RPC to access group 0 only after group0_id is persisted.
+ _group0 = group0_id;
+}
+
+future<> raft_group0::leave_raft_group0() {
+ assert(this_shard_id() == 0);
+ raft::server_address my_addr;
+
my_addr.id = raft::server_id{co_await db::system_keyspace::get_raft_server_id()};
+ if (
my_addr.id == raft::server_id{}) {
+ // Nothing to do
+ co_return;
+ }
+
my_addr.info = ser::serialize_to_buffer<bytes>(_gossiper.get_broadcast_address());
+ raft::group_id group0_id;
+ raft::server_address leader;
+ if (std::holds_alternative<raft::group_id>(_group0)) {
+ group0_id = std::get<raft::group_id>(_group0);
+ } else {
+ group0_id = raft::group_id{co_await db::system_keyspace::get_raft_group0_id()};
+ if (group0_id == raft::group_id{}) {
+ auto li = co_await discover_leader(my_addr);
+ group0_id = li.group0_id;
+ leader = li.addr;
+ if (
leader.id ==
my_addr.id) {
+ co_return;
+ }
+ }
+ co_await _raft_gr.start_server_for_group(create_server_for_group(group0_id, my_addr));
+ }
+ leader = co_await _raft_gr.guess_leader(group0_id);
+ co_await rpc_until_success(leader, [this, group0_id, my_addr] (auto peer, auto timeout)
+ -> future<raft_success_or_bounce> {
+ co_return co_await _ms.send_raft_remove_server(peer, timeout, group0_id,
my_addr.id);
+ });
+}
+
+future<raft_peer_exchange> raft_group0::peer_exchange(discovery::peer_list peers) {
+ return std::visit([this, peers = std::move(peers)] (auto&& d) -> future<raft_peer_exchange> {
+ using T = std::decay_t<decltype(d)>;
+ if constexpr (std::is_same_v<T, std::monostate>) {
+ // Discovery not started or we're persisting the
+ // leader information locally.
+ co_return raft_peer_exchange{std::monostate{}};
+ } else if constexpr (std::is_same_v<T, discovery>) {
+ // Use discovery to produce a response
+ co_return raft_peer_exchange{d.request(std::move(peers))};
+ } else if constexpr (std::is_same_v<T, raft::group_id>) {
+ // Obtain leader information from existing Group 0 server.
+ auto addr = co_await _raft_gr.guess_leader(std::get<raft::group_id>(_group0));
+ co_return raft_peer_exchange{raft_leader_info{
+ .group0_id = std::get<raft::group_id>(_group0),
+ .addr = addr
+ }};
+ }
+ }, _group0);
+}
+
+} // end of namespace service
+
diff --git a/service/raft/raft_group_registry.cc b/service/raft/raft_group_registry.cc
index 685e23941c..08ce811ec8 100644
--- a/service/raft/raft_group_registry.cc
+++ b/service/raft/raft_group_registry.cc
@@ -20,28 +20,20 @@
*/
#include "service/raft/raft_group_registry.hh"
#include "service/raft/raft_rpc.hh"
-#include "service/raft/raft_sys_table_storage.hh"
-#include "service/raft/schema_raft_state_machine.hh"
#include "service/raft/raft_gossip_failure_detector.hh"
-
#include "message/messaging_service.hh"
-#include "cql3/query_processor.hh"
-#include "gms/gossiper.hh"
+#include "serializer_impl.hh"
+
-#include <seastar/core/smp.hh>
#include <seastar/core/coroutine.hh>
-#include <seastar/core/on_internal_error.hh>
-#include <seastar/util/log.hh>
-#include <seastar/util/defer.hh>
namespace service {
logging::logger rslog("raft_group_registry");
-raft_group_registry::raft_group_registry(netw::messaging_service& ms, gms::gossiper& gs, sharded<cql3::query_processor>& qp)
- : _ms(ms), _gossiper(gs), _qp(qp), _fd(make_shared<raft_gossip_failure_detector>(gs, _srv_address_mappings))
+raft_group_registry::raft_group_registry(netw::messaging_service& ms, gms::gossiper& gossiper)
+ : _ms(ms), _fd(make_shared<raft_gossip_failure_detector>(gossiper, _srv_address_mappings))
{
- (void) _gossiper;
}
void raft_group_registry::init_rpc_verbs() {
@@ -107,6 +99,56 @@ void raft_group_registry::init_rpc_verbs() {
return make_ready_future<>();
});
});
+
+ auto cas_or_bounce = [this](raft::group_id gid, auto cas) -> future<raft_success_or_bounce> {
+ raft::server& server = get_server(gid);
+ try {
+ co_await server.cas(cas);
+ co_return raft_success_or_bounce{};
+ } catch (const std::exception& e) {
+ if (! raft::is_transient_error(e)) {
+ throw;
+ }
+ }
+ auto bounce = co_await guess_leader(gid);
+ co_return raft_success_or_bounce{.bounce = bounce};
+ };
+
+ auto raft_add_server_impl = [this, cas_or_bounce] (const rpc::client_info& cinfo,
+ rpc::opt_time_point timeout, raft::group_id gid, raft::server_address addr)
+ -> future<raft_success_or_bounce> {
+
+ return container().invoke_on(shard_for_group(gid), [gid, addr, cas_or_bounce] (
+ raft_group_registry& self) -> future<raft_success_or_bounce> {
+
+ return cas_or_bounce(gid, [addr] (const raft::server& server) -> future<raft::server_address_set> {
+
rslog.info("Adding node to Raft Group 0 {}",
+ ser::deserialize_from_buffer(
addr.info, boost::type<gms::inet_address>{}));
+ auto configuration = server.get_configuration().current;
+ configuration.emplace(std::move(addr));
+ co_return configuration;
+ });
+ });
+ };
+
+ _ms.register_raft_add_server(raft_add_server_impl);
+
+ auto raft_remove_server_impl = [this, cas_or_bounce] (const rpc::client_info& cinfo,
+ rpc::opt_time_point timeout, raft::group_id gid, raft::server_id sid) -> future<raft_success_or_bounce> {
+
+ return container().invoke_on(shard_for_group(gid), [gid, sid, cas_or_bounce] (
+ raft_group_registry& self) -> future<raft_success_or_bounce> {
+
+ return cas_or_bounce(gid, [sid] (const raft::server& server) -> future<raft::server_address_set> {
+ auto configuration = server.get_configuration().current;
+
rslog.info("Removing node to Raft Group 0 {}", sid);
+ configuration.erase(raft::server_address{.id = sid});
+ co_return configuration;
+ });
+ });
+ };
+
+ _ms.register_raft_remove_server(raft_remove_server_impl);
}
future<> raft_group_registry::uninit_rpc_verbs() {
@@ -116,7 +158,9 @@ future<> raft_group_registry::uninit_rpc_verbs() {
_ms.unregister_raft_append_entries_reply(),
_ms.unregister_raft_vote_request(),
_ms.unregister_raft_vote_reply(),
- _ms.unregister_raft_timeout_now()
+ _ms.unregister_raft_timeout_now(),
+ _ms.unregister_raft_add_server(),
+ _ms.unregister_raft_remove_server()
).discard_result();
}
@@ -129,17 +173,19 @@ future<> raft_group_registry::stop_servers() {
co_await when_all_succeed(stop_futures.begin(), stop_futures.end());
}
-seastar::future<> raft_group_registry::init() {
+seastar::future<> raft_group_registry::start() {
// Once a Raft server starts, it soon times out
// and starts an election, so RPC must be ready by
// then to send VoteRequest messages.
co_return init_rpc_verbs();
}
-seastar::future<> raft_group_registry::uninit() {
- return uninit_rpc_verbs().then([this] {
- return stop_servers();
- });
+seastar::future<> raft_group_registry::stop() {
+ co_await when_all_succeed(
+ _shutdown_gate.close(),
+ uninit_rpc_verbs(),
+ stop_servers()
+ ).discard_result();
}
raft_server_for_group& raft_group_registry::server_for_group(raft::group_id gid) {
@@ -158,26 +204,8 @@ raft::server& raft_group_registry::get_server(raft::group_id gid) {
return *(server_for_group(gid).server);
}
-raft_server_for_group raft_group_registry::create_server_for_group(raft::group_id gid) {
-
- raft::server_id my_id = raft::server_id::create_random_id();
- auto rpc = std::make_unique<raft_rpc>(_ms, _srv_address_mappings, gid, my_id);
- // Keep a reference to a specific RPC class.
- auto& rpc_ref = *rpc;
- auto storage = std::make_unique<raft_sys_table_storage>(_qp.local(), gid);
- auto state_machine = std::make_unique<schema_raft_state_machine>();
- auto server = raft::create_server(my_id, std::move(rpc), std::move(state_machine),
- std::move(storage), _fd, raft::server::configuration());
-
- // initialize the corresponding timer to tick the raft server instance
- auto ticker = std::make_unique<raft_ticker_type>([srv = server.get()] { srv->tick(); });
-
- return raft_server_for_group{
- .gid = std::move(gid),
- .server = std::move(server),
- .ticker = std::move(ticker),
- .rpc = rpc_ref,
- };
+raft::server& raft_group_registry::group0() {
+ return *(server_for_group(*_group0_id).server);
}
future<> raft_group_registry::start_server_for_group(raft_server_for_group new_grp) {
@@ -187,6 +215,9 @@ future<> raft_group_registry::start_server_for_group(raft_server_for_group new_g
if (!inserted) {
on_internal_error(rslog, format("Attempt to add the second instance of raft server with the same gid={}", gid));
}
+ if (_servers.size() == 1 && this_shard_id() == 0) {
+ _group0_id = gid;
+ }
auto& grp = it->second;
try {
// start the server instance prior to arming the ticker timer.
@@ -206,5 +237,52 @@ unsigned raft_group_registry::shard_for_group(const raft::group_id& gid) const {
return 0; // schema raft server is always owned by shard 0
}
+future<raft::server_address>
+raft_group_registry::guess_leader(raft::group_id gid) {
+ raft::server& server = get_server(gid);
+ co_return co_await with_gate(_shutdown_gate, [this, &server] () -> future<raft::server_address> {
+ try {
+ co_await server.read_barrier();
+ co_return _srv_address_mappings.get_server_address(
server.id());
+ } catch (std::exception& e) {
+ if (auto n_a_l = dynamic_cast<raft::not_a_leader*>(&e)) {
+ if (n_a_l->leader != raft::server_id{}) {
+ auto it = _srv_address_mappings.find(n_a_l->leader);
+ if (it) {
+ auto addr = raft::server_address{
+ .id = n_a_l->leader,
+ .info = ser::serialize_to_buffer<bytes>(*it)
+ };
+ co_return addr;
+ }
+ }
+ // Fall through into the round-robin mode if there is
+ // no mapping for this raft server id or of there is
+ // no stable leader.
+ } else if (! raft::is_transient_error(e)) {
+ throw;
+ }
+ // We're in a state of uncertainty, an election is
+ // in progress.
+ std::set<raft::server_address> sorted(
+ server.get_configuration().current.begin(),
+ server.get_configuration().current.end());
+ assert(!sorted.empty());
+ auto it = sorted.find(raft::server_address{.id =
server.id()});
+ // The group is put into registry on this server only
+ // after a majority commits the configuration change,
+ // but not necessarily this server was part of this
+ // majority. Hence, when round-robining over the list
+ // of servers we may arrive to a server which doesn't
+ // yet have itself as part of the config. In that case
+ // skip over to the first server which is part of the
+ // config.
+ if (it == sorted.end() || ++it == sorted.end()) {
+ co_return *sorted.begin();
+ }
+ co_return *it;
+ }
+ });
+}
} // end of namespace service
diff --git a/service/storage_service.cc b/service/storage_service.cc
index 4f27641455..b5fa6b096a 100644
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -48,6 +48,7 @@
#include "log.hh"
#include "service/migration_manager.hh"
#include "service/storage_proxy.hh"
+#include "service/raft/raft_group0.hh"
#include "to_string.hh"
#include "gms/gossiper.hh"
#include "gms/failure_detector.hh"
@@ -143,7 +144,6 @@ storage_service::storage_service(abort_source& abort_source,
if (snitch.local_is_initialized()) {
_listeners.emplace_back(make_lw_shared(snitch.local()->when_reconfigured(_snitch_reconfigure)));
}
- (void) _raft_gr;
}
void storage_service::enable_all_features() {
@@ -638,6 +638,15 @@ void storage_service::join_token_ring(int delay) {
// start participating in the ring.
set_gossip_tokens(_gossiper, _bootstrap_tokens, _cdc_gen_id);
+
+ // Until topology tables are managed by Raft, Gossip continues to be
+ // the primary source of truth, so we should initialize Raft
+ // once gossip tokens have been persisted, so that if
+ // initialization fails, the node is visible to removenode
+ // which can be used to clean this node's state from the
+ // cluster.
+ _group0->join_raft_group0().get();
+
set_mode(mode::NORMAL, "node is now in normal status", true);
if (get_token_metadata().sorted_tokens().empty()) {
@@ -737,6 +746,7 @@ void storage_service::bootstrap() {
slogger.debug("Removing replaced endpoint {} from system.peers", *replace_addr);
db::system_keyspace::remove_endpoint(*replace_addr).get();
}
+ _group0->leave_raft_group0().get();
}
_db.invoke_on_all([this] (database& db) {
@@ -1394,12 +1404,15 @@ future<> storage_service::uninit_messaging_service_part() {
return container().invoke_on_all(&service::storage_service::uninit_messaging_service);
}
-future<> storage_service::init_server(bind_messaging_port do_bind) {
+future<> storage_service::init_server(cql3::query_processor& qp, bind_messaging_port do_bind) {
assert(this_shard_id() == 0);
- return seastar::async([this, do_bind] {
+ return seastar::async([this, &qp, do_bind] {
_initialized = true;
+ _group0 = std::make_unique<raft_group0>(_raft_gr, _messaging.local(), _gossiper, qp,
+ _migration_manager.local());
+
std::unordered_set<inet_address> loaded_endpoints;
if (_db.local().get_config().load_ring_state()) {
slogger.info("Loading persisted ring state");
@@ -1506,9 +1519,13 @@ future<> storage_service::stop() {
// make sure nobody uses the semaphore
node_ops_singal_abort(std::nullopt);
_listeners.clear();
- return _schema_version_publisher.join().finally([this] {
- return std::move(_node_ops_abort_thread);
- });
+ future<> group0_f =_group0 ? _group0->abort() : make_ready_future<>();
+ return when_all_succeed(
+ _schema_version_publisher.join().finally([this] {
+ return std::move(_node_ops_abort_thread);
+ }),
+ std::move(group0_f)
+ ).discard_result();
}
future<> storage_service::check_for_endpoint_collision(std::unordered_set<gms::inet_address> initial_contact_nodes, const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features, bind_messaging_port do_bind) {
@@ -2371,6 +2388,7 @@ future<> storage_service::removenode(sstring host_id_string, std::list<gms::inet
// Step 5: Announce the node has left
+ ss._group0->leave_raft_group0().get();
ss._gossiper.advertise_token_removed(endpoint, host_id).get();
std::unordered_set<token> tmp(tokens.begin(), tokens.end());
ss.excise(std::move(tmp), endpoint);
@@ -3605,10 +3623,29 @@ void storage_service::init_messaging_service() {
_messaging.local().register_replication_finished([] (gms::inet_address from) {
return get_local_storage_service().confirm_replication(from);
});
+
+ auto peer_exchange_impl = [this](const rpc::client_info& cinfo, rpc::opt_time_point timeout,
+ discovery::peer_list peers) -> future<raft_peer_exchange> {
+
+ return container().invoke_on(0 /* group 0 is on shard 0 */, [peers = std::move(peers)] (
+ storage_service& self) -> future<raft_peer_exchange> {
+
+ if (self._group0) {
+ return self._group0->peer_exchange(std::move(peers));
+ } else {
+ return make_ready_future<raft_peer_exchange>(raft_peer_exchange{std::monostate{}});
+ }
+ });
+ };
+
+ _messaging.local().register_raft_peer_exchange(peer_exchange_impl);
}
future<> storage_service::uninit_messaging_service() {
- return _messaging.local().unregister_replication_finished();
+ return when_all_succeed(
+ _messaging.local().unregister_replication_finished(),
+ _messaging.local().unregister_raft_peer_exchange()
+ ).discard_result();
}
void storage_service::do_isolate_on_error(disk_error type)
@@ -3673,6 +3710,7 @@ future<> storage_service::force_remove_completion() {
slogger.warn("No host_id is found for endpoint {}", endpoint);
continue;
}
+ ss._group0->leave_raft_group0().get();
ss._gossiper.advertise_token_removed(endpoint, host_id).get();
std::unordered_set<token> tokens_set(tokens.begin(), tokens.end());
ss.excise(tokens_set, endpoint);
diff --git a/test/boost/gossip_test.cc b/test/boost/gossip_test.cc
index cb5da57a1b..3eb5ac836e 100644
--- a/test/boost/gossip_test.cc
+++ b/test/boost/gossip_test.cc
@@ -90,8 +90,9 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
service::storage_service_config sscfg;
sscfg.available_memory = memory::stats().total_memory();
- raft_gr.start(std::ref(_messaging), std::ref(gms::get_gossiper()), std::ref(qp)).get();
+ raft_gr.start(std::ref(_messaging), std::ref(gms::get_gossiper())).get();
auto stop_raft = defer([&raft_gr] { raft_gr.stop().get(); });
+ raft_gr.invoke_on_all(&service::raft_group_registry::start).get();
service::get_storage_service().start(std::ref(abort_sources),
std::ref(db), std::ref(gms::get_gossiper()),
diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc
index dbb88e4e43..d90ce162a0 100644
--- a/test/lib/cql_test_env.cc
+++ b/test/lib/cql_test_env.cc
@@ -535,8 +535,8 @@ class single_node_cql_env : public cql_test_env {
sharded<repair_service> repair;
sharded<cql3::query_processor> qp;
sharded<service::raft_group_registry> raft_gr;
- raft_gr.start(std::ref(ms), std::ref(gms::get_gossiper()), std::ref(qp)).get();
- auto stop_raft = defer([&raft_gr] { raft_gr.stop().get(); });
+ raft_gr.start(std::ref(ms), std::ref(gms::get_gossiper())).get();
+ raft_gr.invoke_on_all(&service::raft_group_registry::start).get();
auto& ss = service::get_storage_service();
service::storage_service_config sscfg;
@@ -655,19 +655,17 @@ class single_node_cql_env : public cql_test_env {
auto stop_database_d = defer([&db] {
stop_database(db).get();
});
+ // XXX: stop_raft before stopping the database and
+ // query processor. Group registry stop raft groups
+ // when stopped, and until then the groups may use
+ // the database and the query processor.
+ auto stop_raft = defer([&raft_gr] { raft_gr.stop().get(); });
db::system_keyspace::init_local_cache().get();
auto stop_local_cache = defer([] { db::system_keyspace::deinit_local_cache().get(); });
sys_dist_ks.start(std::ref(qp), std::ref(mm), std::ref(proxy)).get();
- // We need to have a system keyspace started and
- // initialized to initialize Raft service.
- raft_gr.invoke_on_all(&service::raft_group_registry::init).get();
- auto stop_raft_rpc = defer([&raft_gr] {
- raft_gr.invoke_on_all(&service::raft_group_registry::uninit).get();
- });
-
cdc_generation_service.start(std::ref(*cfg), std::ref(gms::get_gossiper()), std::ref(sys_dist_ks), std::ref(abort_sources), std::ref(token_metadata), std::ref(feature_service)).get();
auto stop_cdc_generation_service = defer([&cdc_generation_service] {
cdc_generation_service.stop().get();
@@ -680,7 +678,7 @@ class single_node_cql_env : public cql_test_env {
cdc.stop().get();
});
- service::get_local_storage_service().init_server(service::bind_messaging_port(false)).get();
+ service::get_local_storage_service().init_server(qp.local(), service::bind_messaging_port(false)).get();
service::get_local_storage_service().join_cluster().get();
auth::permissions_cache_config perm_cache_config;
diff --git a/test/manual/gossip.cc b/test/manual/gossip.cc
index aceb2484d1..cd54aab091 100644
--- a/test/manual/gossip.cc
+++ b/test/manual/gossip.cc
@@ -96,7 +96,7 @@ int main(int ac, char ** av) {
messaging.start(listen).get();
gms::get_gossiper().start(std::ref(abort_sources), std::ref(feature_service), std::ref(token_metadata), std::ref(messaging), std::ref(*cfg)).get();
- raft_gr.start(std::ref(messaging), std::ref(gms::get_gossiper()), std::ref(qp)).get();
+ raft_gr.start(std::ref(messaging), std::ref(gms::get_gossiper())).get();
auto stop_raft = defer([&raft_gr] { raft_gr.stop().get(); });
service::init_storage_service(std::ref(abort_sources),
--
2.25.1