[PATCH] service: raft: move group0 write path into a separate file

2 views
Skip to first unread message

Gleb Natapov

<gleb@scylladb.com>
unread,
May 15, 2022, 9:55:50 AM5/15/22
to scylladb-dev@googlegroups.com
Writing into the group0 raft group on a client side involves locking
the state machine, choosing a state id and checking for its presence
after operation completes. The code that does it resides now in the
migration manager since the currently it is the only user of group0. In
the near future we will have more client for group0 and they all will
have to have the same logic, so the patch moves it to a separate class
raft_group0_client that any future user of group0 can use to write
into it.

diff --git a/configure.py b/configure.py
index 8401425d47..8d35d76068 100755
--- a/configure.py
+++ b/configure.py
@@ -1020,6 +1020,7 @@ scylla_core = (['replica/database.cc',
'service/raft/raft_group_registry.cc',
'service/raft/discovery.cc',
'service/raft/raft_group0.cc',
+ 'service/raft/raft_group0_client.cc',
] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] \
+ scylla_raft_core
)
diff --git a/service/migration_manager.hh b/service/migration_manager.hh
index 9e193a573a..63360e1877 100644
--- a/service/migration_manager.hh
+++ b/service/migration_manager.hh
@@ -26,6 +26,7 @@
#include "utils/UUID.hh"
#include "utils/serialized_action.hh"
#include "service/raft/raft_group_registry.hh"
+#include "service/raft/raft_group0_client.hh"

#include <vector>

@@ -55,34 +56,6 @@ class storage_proxy;
template<typename M>
concept MergeableMutation = std::is_same<M, canonical_mutation>::value || std::is_same<M, frozen_mutation>::value;

-// Obtaining this object means that all previously finished operations on group 0 are visible on this node.
-// It is also required in order to perform group 0 changes (through `announce`).
-// See `group0_guard::impl` for more detailed explanations.
-class group0_guard {
- friend class migration_manager;
- struct impl;
- std::unique_ptr<impl> _impl;
-
- group0_guard(std::unique_ptr<impl>);
-
-public:
- ~group0_guard();
- group0_guard(group0_guard&&) noexcept;
-
- utils::UUID observed_group0_state_id() const;
- utils::UUID new_group0_state_id() const;
-
- // Use this timestamp when creating group 0 mutations.
- api::timestamp_type write_timestamp() const;
-};
-
-class group0_concurrent_modification : public std::runtime_error {
-public:
- group0_concurrent_modification()
- : std::runtime_error("Failed to apply group 0 change due to concurrent modification")
- {}
-};
-
class migration_manager : public seastar::async_sharded_service<migration_manager>,
public gms::i_endpoint_state_change_subscriber,
public seastar::peering_sharded_service<migration_manager> {
@@ -98,21 +71,15 @@ class migration_manager : public seastar::async_sharded_service<migration_manage
service::storage_proxy& _storage_proxy;
gms::gossiper& _gossiper;
seastar::abort_source _as;
- service::raft_group_registry& _raft_gr;
+ service::raft_group0_client& _group0_client;
sharded<db::system_keyspace>& _sys_ks;
serialized_action _schema_push;
utils::UUID _schema_version_to_publish;

- friend class group0_state_machine;
- // See `group0_guard::impl` for explanation of the purpose of these locks.
- semaphore _group0_read_apply_mutex;
- semaphore _group0_operation_mutex;
-
- gc_clock::duration _group0_history_gc_duration;
-
+ friend class group0_state_machine; // needed for access to _messaging
size_t _concurrent_ddl_retries;
public:
- migration_manager(migration_notifier&, gms::feature_service&, netw::messaging_service& ms, service::storage_proxy&, gms::gossiper& gossiper, service::raft_group_registry& raft_gr, sharded<db::system_keyspace>& sysks);
+ migration_manager(migration_notifier&, gms::feature_service&, netw::messaging_service& ms, service::storage_proxy&, gms::gossiper& gossiper, service::raft_group0_client& group0_client, sharded<db::system_keyspace>& sysks);

migration_notifier& get_notifier() { return _notifier; }
const migration_notifier& get_notifier() const { return _notifier; }
@@ -193,7 +160,7 @@ class migration_manager : public seastar::async_sharded_service<migration_manage
future<group0_guard> start_group0_operation();

// used to check if raft is enabled on the cluster
- bool is_raft_enabled() { return _raft_gr.is_enabled(); }
+ bool is_raft_enabled() { return _group0_client.is_enabled(); }

// Apply a group 0 change.
// The future resolves after the change is applied locally.
@@ -254,14 +221,8 @@ class migration_manager : public seastar::async_sharded_service<migration_manage
virtual future<> before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_statekey, const gms::versioned_value& newvalue) override { return make_ready_future(); }

public:
- // For tests only.
- void set_group0_history_gc_duration(gc_clock::duration);
-
// For tests only.
void set_concurrent_ddl_retries(size_t);
-
- // For tests only.
- semaphore& group0_operation_mutex();
};

future<column_mapping> get_column_mapping(utils::UUID table_id, table_schema_version v);
diff --git a/service/raft/group0_state_machine.hh b/service/raft/group0_state_machine.hh
index 690c25545a..b45e8455ba 100644
--- a/service/raft/group0_state_machine.hh
+++ b/service/raft/group0_state_machine.hh
@@ -13,6 +13,7 @@
#include "service/raft/raft_state_machine.hh"

namespace service {
+class raft_group0_client;
class migration_manager;
class storage_proxy;

@@ -58,10 +59,11 @@ struct group0_command {
// Raft state machine implementation for managing group 0 changes (e.g. schema changes).
// NOTE: group 0 raft server is always instantiated on shard 0.
class group0_state_machine : public raft_state_machine {
+ raft_group0_client& _client;
migration_manager& _mm;
storage_proxy& _sp;
public:
- group0_state_machine(migration_manager& mm, storage_proxy& sp) : _mm(mm), _sp(sp) {}
+ group0_state_machine(raft_group0_client& client, migration_manager& mm, storage_proxy& sp) : _client(client), _mm(mm), _sp(sp) {}
future<> apply(std::vector<raft::command_cref> command) override;
future<raft::snapshot_id> take_snapshot() override;
void drop_snapshot(raft::snapshot_id id) override;
diff --git a/service/raft/raft_group0.hh b/service/raft/raft_group0.hh
index 71aae4e334..3de0642e3e 100644
--- a/service/raft/raft_group0.hh
+++ b/service/raft/raft_group0.hh
@@ -17,6 +17,7 @@ namespace gms { class gossiper; }
namespace service {

class migration_manager;
+class raft_group0_client;

// Wrapper for `discovery` which persists the learned peers on disk.
class persistent_discovery {
@@ -65,6 +66,8 @@ class raft_group0 {
gms::gossiper& _gossiper;
cql3::query_processor& _qp;
service::migration_manager& _mm;
+ raft_group0_client& _client;
+
// Status of leader discovery. Initially there is no group 0,
// and the variant contains no state. During initial cluster
// bootstrap a discovery object is created, which is then
@@ -86,7 +89,8 @@ class raft_group0 {
netw::messaging_service& ms,
gms::gossiper& gs,
cql3::query_processor& qp,
- migration_manager& mm);
+ migration_manager& mm,
+ raft_group0_client& client);

future<> abort() {
if (!_abort_source.abort_requested()) {
diff --git a/service/raft/raft_group0_client.hh b/service/raft/raft_group0_client.hh
new file mode 100644
index 0000000000..7983362d58
--- /dev/null
+++ b/service/raft/raft_group0_client.hh
@@ -0,0 +1,79 @@
+/*
+ * Copyright (C) 2022-present ScyllaDB
+ *
+ * Modified by ScyllaDB
+ */
+
+/*
+ * SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
+ */
+
+#pragma once
+
+#include <memory>
+#include <seastar/core/semaphore.hh>
+#include <seastar/core/abort_source.hh>
+#include "service/raft/raft_group_registry.hh"
+#include "utils/UUID.hh"
+#include "timestamp.hh"
+#include "gc_clock.hh"
+#include "service/raft/group0_state_machine.hh"
+#include "db/system_keyspace.hh"
+#include "utils/fb_utilities.hh"
+
+namespace service {
+// Obtaining this object means that all previously finished operations on group 0 are visible on this node.
+
+// It is also required in order to perform group 0 changes
+// See `group0_guard::impl` for more detailed explanations.
+class group0_guard {
+ friend class raft_group0_client;
+ struct impl;
+ std::unique_ptr<impl> _impl;
+
+ group0_guard(std::unique_ptr<impl>);
+
+public:
+ ~group0_guard();
+ group0_guard(group0_guard&&) noexcept;
+
+ utils::UUID observed_group0_state_id() const;
+ utils::UUID new_group0_state_id() const;
+
+ // Use this timestamp when creating group 0 mutations.
+ api::timestamp_type write_timestamp() const;
+};
+
+class group0_concurrent_modification : public std::runtime_error {
+public:
+ group0_concurrent_modification()
+ : std::runtime_error("Failed to apply group 0 change due to concurrent modification")
+ {}
+};
+
+// Singleton that exists only on shard zero. Used to post commands to group zero
+class raft_group0_client {
+ friend class group0_state_machine;
+ service::raft_group_registry& _raft_gr;
+ // See `group0_guard::impl` for explanation of the purpose of these locks.
+ semaphore _read_apply_mutex = semaphore(1);
+ semaphore _operation_mutex = semaphore(1);
+
+ gc_clock::duration _history_gc_duration = gc_clock::duration{std::chrono::duration_cast<gc_clock::duration>(std::chrono::weeks{1})};
+public:
+ raft_group0_client(service::raft_group_registry& raft_gr) : _raft_gr(raft_gr) {}
+
+ future<> add_entry(group0_command group0_cmd, group0_guard guard, seastar::abort_source* as = nullptr);
+ bool is_enabled() {
+ return _raft_gr.is_enabled();
+ }
+ future<group0_guard> start_operation(seastar::abort_source* as = nullptr);
+
+ group0_command prepare_command(schema_change change, group0_guard& guard, std::string_view description);
+
+ // for test only
+ void set_history_gc_duration(gc_clock::duration d);
+ semaphore& operation_mutex();
+};
+
+}
\ No newline at end of file
diff --git a/service/storage_service.hh b/service/storage_service.hh
index 1615feca00..92132977a3 100644
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -88,6 +88,7 @@ namespace service {
class storage_service;
class migration_manager;
class raft_group0;
+class raft_group0_client;

enum class disk_error { regular, commit };

@@ -371,7 +372,7 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
*
* \see init_messaging_service_part
*/
- future<> init_server(cql3::query_processor& qp);
+ future<> init_server(cql3::query_processor& qp, raft_group0_client& client);

future<> join_cluster();

diff --git a/test/lib/cql_test_env.hh b/test/lib/cql_test_env.hh
index b29b0a400f..c503a94720 100644
--- a/test/lib/cql_test_env.hh
+++ b/test/lib/cql_test_env.hh
@@ -49,6 +49,7 @@ namespace service {

class client_state;
class migration_manager;
+class raft_group0_client;

}

@@ -159,6 +160,8 @@ class cql_test_env {

virtual future<> refresh_client_state() = 0;

+ virtual service::raft_group0_client& get_raft_group0_client() = 0;
+
data_dictionary::database data_dictionary();
};

diff --git a/main.cc b/main.cc
index 5d2aa37023..4466a090a6 100644
--- a/main.cc
+++ b/main.cc
@@ -88,6 +88,7 @@
#include "tools/entry_point.hh"

#include "service/raft/raft_group_registry.hh"
+#include "service/raft/raft_group0_client.hh"

#include <boost/algorithm/string/join.hpp>

@@ -988,11 +989,15 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
auto stop_forward_service_handlers = defer_verbose_shutdown("forward service", [&forward_service] {
forward_service.stop().get();
});
+
+ // gropu0 client exists only on shard 0
+ service::raft_group0_client group0_client(raft_gr.local());
+
// #293 - do not stop anything
// engine().at_exit([&proxy] { return proxy.stop(); });
supervisor::notify("starting migration manager");
debug::the_migration_manager = &mm;
- mm.start(std::ref(mm_notifier), std::ref(feature_service), std::ref(messaging), std::ref(proxy), std::ref(gossiper), std::ref(raft_gr), std::ref(sys_ks)).get();
+ mm.start(std::ref(mm_notifier), std::ref(feature_service), std::ref(messaging), std::ref(proxy), std::ref(gossiper), std::ref(group0_client), std::ref(sys_ks)).get();
auto stop_migration_manager = defer_verbose_shutdown("migration manager", [&mm] {
mm.stop().get();
});
@@ -1251,7 +1256,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
}).get();

with_scheduling_group(maintenance_scheduling_group, [&] {
- return ss.local().init_server(qp.local());
+ return ss.local().init_server(qp.local(), group0_client);
}).get();

// Raft group0 can be joined before we wait for gossip to settle
diff --git a/service/migration_manager.cc b/service/migration_manager.cc
index d54fa0b5d9..792803538c 100644
--- a/service/migration_manager.cc
+++ b/service/migration_manager.cc
@@ -39,11 +39,6 @@
#include "serializer_impl.hh"
#include "idl/frozen_schema.dist.impl.hh"
#include "idl/uuid.dist.impl.hh"
-#include "idl/raft_storage.dist.hh"
-#include "idl/raft_storage.dist.impl.hh"
-#include "idl/group0_state_machine.dist.hh"
-#include "idl/group0_state_machine.dist.impl.hh"
-

namespace service {

@@ -55,12 +50,10 @@ const std::chrono::milliseconds migration_manager::migration_delay = 60000ms;
static future<schema_ptr> get_schema_definition(table_schema_version v, netw::messaging_service::msg_addr dst, netw::messaging_service& ms, service::storage_proxy& sp);

migration_manager::migration_manager(migration_notifier& notifier, gms::feature_service& feat, netw::messaging_service& ms,
- service::storage_proxy& storage_proxy, gms::gossiper& gossiper, service::raft_group_registry& raft_gr, sharded<db::system_keyspace>& sysks) :
- _notifier(notifier), _feat(feat), _messaging(ms), _storage_proxy(storage_proxy), _gossiper(gossiper), _raft_gr(raft_gr)
+ service::storage_proxy& storage_proxy, gms::gossiper& gossiper, service::raft_group0_client& group0_client, sharded<db::system_keyspace>& sysks) :
+ _notifier(notifier), _feat(feat), _messaging(ms), _storage_proxy(storage_proxy), _gossiper(gossiper), _group0_client(group0_client)
, _sys_ks(sysks)
, _schema_push([this] { return passive_announce(); })
- , _group0_read_apply_mutex{1}, _group0_operation_mutex{1}
- , _group0_history_gc_duration{std::chrono::duration_cast<gc_clock::duration>(std::chrono::weeks{1})}
, _concurrent_ddl_retries{10}
{
}
@@ -140,7 +133,7 @@ void migration_manager::init_messaging_service()
auto features = self._feat.cluster_schema_features();
auto& proxy = self._storage_proxy.container();
auto cm = co_await db::schema_tables::convert_schema_to_mutations(proxy, features);
- if (self._raft_gr.is_enabled() && options->group0_snapshot_transfer) {
+ if (self.is_raft_enabled() && options->group0_snapshot_transfer) {
// if `group0_snapshot_transfer` is `true`, the sender must also understand canonical mutations
// (`group0_snapshot_transfer` was added more recently).
if (!cm_retval_supported) {
@@ -892,166 +885,18 @@ future<> migration_manager::push_schema_mutation(const gms::inet_address& endpoi
return _messaging.send_definitions_update(id, std::move(fm), std::move(cm));
}

-/* *** Linearizing group 0 operations ***
- *
- * Group 0 changes (e.g. schema changes) are performed through Raft commands, which are executing in the same order
- * on every node, according to the order they appear in the Raft log
- * (executing a command happens in `group0_state_machine::apply`).
- * The commands contain mutations which modify tables that store group 0 state.
- *
- * However, constructing these mutations often requires reading the current state and validating the change against it.
- * This happens outside the code which applies the commands in order and may race with it. At the moment of applying
- * a command, the mutations stored within may be 'invalid' because a different command managed to be concurrently applied,
- * changing the state.
- *
- * For example, consider the sequence of commands:
- *
- * C1, C2, C3.
- *
- * Suppose that mutations inside C2 were constructed on a node which already applied C1. Thus, when applying C2,
- * the state of group 0 is the same as when the change was validated and its mutations were constructed.
- *
- * On the other hand, suppose that mutations inside C3 were also constructed on a node which applied C1, but didn't
- * apply C2 yet. This could easily happen e.g. when C2 and C3 were constructed concurrently on two different nodes.
- * Thus, when applying C3, the state of group 0 is different than it was when validating the change and constructing
- * its mutations: the state consists of the changes from C1 and C2, but when C3 was created, it used the state consisting
- * of changes from C1 (but not C2). Thus the mutations in C3 are not valid and we must not apply them.
- *
- * To protect ourselves from applying such 'obsolete' changes, we detect such commands during `group0_state_machine:apply`
- * and skip their mutations.
- *
- * For this, group 0 state was extended with a 'history table' (system.group0_history), which stores a sequence of
- * 'group 0 state IDs' (which are timeuuids). Each group 0 command also holds a unique state ID; if the command is successful,
- * the ID is appended to the history table. Each command also stores a 'previous state ID'; the change described by the command
- * is only applied when this 'previous state ID' is equal to the last state ID in the history table. If it's different,
- * we skip the change.
- *
- * To perform a group 0 change the user must first read the last state ID from the history table. This happens by obtaining
- * a `group0_guard` through `migration_manager::start_group0_operation`; the observed last state ID is stored in
- * `_observed_group0_state_id`. `start_group0_operation` also generates a new state ID for this change and stores it in
- * `_new_group0_state_id`. We ensure that the new state ID is greater than the observed state ID (in timeuuid order).
- *
- * The user then reads group 0 state, validates the change against the observed state, and constructs the mutations
- * which modify group 0 state. Finally, the user calls `announce`, passing the mutations and the guard.
- *
- * `announce` constructs a command for the group 0 state machine. The command stores the mutations and the state IDs.
- *
- * When the command is applied, we compare the stored observed state ID against the last state ID in the history table.
- * If it's the same, that means no change happened in between - no other command managed to 'sneak in' between the moment
- * the user started the operation and the moment the command was applied.
- *
- * The user must use `group0_guard::write_timestamp()` when constructing the mutations. The timestamp is extracted
- * from the new state ID. This ensures that mutations applied by successful commands have monotonic timestamps.
- * Indeed: the state IDs of successful commands are increasing (the previous state ID of a command that is successful
- * is equal to the new state ID of the previous successful command, and we ensure that the new state ID of a command
- * is greater than the previous state ID of this command).
- *
- * To perform a linearized group 0 read the user must also obtain a `group0_guard`. This ensures that all previously
- * completed changes are visible on this node, as obtaining the guard requires performing a Raft read barrier.
- *
- * Furthermore, obtaining the guard ensures that we don't read partial state, since it holds a lock that is also taken
- * during command application (`_read_apply_mutex_holder`). The lock is released just before sending the command to Raft.
- * TODO: we may still read partial state if we crash in the middle of command application.
- * See `group0_state_machine::apply` for a proposed fix.
- *
- * Obtaining the guard also ensures that there is no concurrent group 0 operation running on this node using another lock
- * (`_operation_mutex_holder`); if we allowed multiple concurrent operations to run, some of them could fail
- * due to the state ID protection. Concurrent operations may still run on different nodes. This lock is thus used
- * for improving liveness of operations running on the same node by serializing them.
- */
-struct group0_guard::impl {
- semaphore_units<> _operation_mutex_holder;
- semaphore_units<> _read_apply_mutex_holder;
-
- utils::UUID _observed_group0_state_id;
- utils::UUID _new_group0_state_id;
-
- impl(const impl&) = delete;
- impl& operator=(const impl&) = delete;
-
- impl(semaphore_units<> operation_mutex_holder, semaphore_units<> read_apply_mutex_holder, utils::UUID observed_group0_state_id, utils::UUID new_group0_state_id)
- : _operation_mutex_holder(std::move(operation_mutex_holder)), _read_apply_mutex_holder(std::move(read_apply_mutex_holder)),
- _observed_group0_state_id(observed_group0_state_id), _new_group0_state_id(new_group0_state_id)
- {}
-
- void release_read_apply_mutex() {
- assert(_read_apply_mutex_holder.count() == 1);
- _read_apply_mutex_holder.return_units(1);
- }
-};
-
-group0_guard::group0_guard(std::unique_ptr<impl> p) : _impl(std::move(p)) {}
-
-group0_guard::~group0_guard() = default;
-
-group0_guard::group0_guard(group0_guard&&) noexcept = default;
-
-utils::UUID group0_guard::observed_group0_state_id() const {
- return _impl->_observed_group0_state_id;
-}
-
-utils::UUID group0_guard::new_group0_state_id() const {
- return _impl->_new_group0_state_id;
-}
-
-api::timestamp_type group0_guard::write_timestamp() const {
- return utils::UUID_gen::micros_timestamp(_impl->_new_group0_state_id);
-}
-
future<> migration_manager::announce_with_raft(std::vector<mutation> schema, group0_guard guard, std::string_view description) {
assert(this_shard_id() == 0);
auto schema_features = _feat.cluster_schema_features();
auto adjusted_schema = db::schema_tables::adjust_schema_for_schema_features(schema, schema_features);

- group0_command group0_cmd {
- .change{schema_change{
+ auto group0_cmd = _group0_client.prepare_command(
+ schema_change{
.mutations{adjusted_schema.begin(), adjusted_schema.end()},
- }},
-
- .history_append{db::system_keyspace::make_group0_history_state_id_mutation(
- guard.new_group0_state_id(), _group0_history_gc_duration, description)},
-
- // IMPORTANT: the retry mechanism below assumes that `prev_state_id` is engaged (not nullopt).
- // Here it is: the return type of `guard.observerd_group0_state_id()` is `utils::UUID`.
- .prev_state_id{guard.observed_group0_state_id()},
- .new_state_id{guard.new_group0_state_id()},
+ },
+ guard, std::move(description));

- .creator_addr{utils::fb_utilities::get_broadcast_address()},
- .creator_id{_raft_gr.group0().id()},
- };
- raft::command cmd;
- ser::serialize(cmd, group0_cmd);
-
- // Release the read_apply mutex so `group0_state_machine::apply` can take it.
- guard._impl->release_read_apply_mutex();
-
- bool retry;
- do {
- retry = false;
- try {
- co_await _raft_gr.group0().add_entry(cmd, raft::wait_type::applied, &_as);
- } catch (const raft::dropped_entry& e) {
- mlogger.warn("`announce_with_raft`: `add_entry` returned \"{}\". Retrying the command (prev_state_id: {}, new_state_id: {})",
- e, group0_cmd.prev_state_id, group0_cmd.new_state_id);
- retry = true;
- } catch (const raft::commit_status_unknown& e) {
- mlogger.warn("`announce_with_raft`: `add_entry` returned \"{}\". Retrying the command (prev_state_id: {}, new_state_id: {})",
- e, group0_cmd.prev_state_id, group0_cmd.new_state_id);
- retry = true;
- } catch (const raft::not_a_leader& e) {
- // This should not happen since follower-to-leader entry forwarding is enabled in group 0.
- // Just fail the operation by propagating the error.
- mlogger.error("`announce_with_raft`: unexpected `not_a_leader` error: \"{}\". Please file an issue.", e);
- throw;
- }
-
- // Thanks to the `prev_state_id` check in `group0_state_machine::apply`, the command is idempotent.
- // It's safe to retry it, even if it means it will be applied multiple times; only the first time
- // can have an effect.
- } while (retry);
-
- // dropping the guard releases `_group0_operation_mutex`, allowing other operations
- // on this node to proceed
+ co_return co_await _group0_client.add_entry(std::move(group0_cmd), std::move(guard), &_as);
}

future<> migration_manager::announce_without_raft(std::vector<mutation> schema) {
@@ -1077,71 +922,18 @@ future<> migration_manager::announce_without_raft(std::vector<mutation> schema)

// Returns a future on the local application of the schema
future<> migration_manager::announce(std::vector<mutation> schema, group0_guard guard, std::string_view description) {
- if (_raft_gr.is_enabled()) {
- if (this_shard_id() != 0) {
- // This should not happen since all places which construct `group0_guard` also check that they are on shard 0.
- // Note: `group0_guard::impl` is private to this module, making this easy to verify.
- on_internal_error(mlogger, "announce: must run on shard 0");
- }
+ if (is_raft_enabled()) {
+ auto schema_features = _feat.cluster_schema_features();
+ auto adjusted_schema = db::schema_tables::adjust_schema_for_schema_features(schema, schema_features);

- auto new_group0_state_id = guard.new_group0_state_id();
co_await announce_with_raft(std::move(schema), std::move(guard), std::move(description));
-
- if (!(co_await db::system_keyspace::group0_history_contains(new_group0_state_id))) {
- // The command was applied but the history table does not contain the new group 0 state ID.
- // This means `apply` skipped the change due to previous state ID mismatch.
- throw group0_concurrent_modification{};
- }
} else {
co_await announce_without_raft(std::move(schema));
}
}

-static utils::UUID generate_group0_state_id(utils::UUID prev_state_id) {
- auto ts = api::new_timestamp();
- if (prev_state_id != utils::UUID{}) {
- auto lower_bound = utils::UUID_gen::micros_timestamp(prev_state_id);
- if (ts <= lower_bound) {
- ts = lower_bound + 1;
- }
- }
- return utils::UUID_gen::get_random_time_UUID_from_micros(std::chrono::microseconds{ts});
-}
-
future<group0_guard> migration_manager::start_group0_operation() {
- if (_raft_gr.is_enabled()) {
- if (this_shard_id() != 0) {
- on_internal_error(mlogger, "start_group0_operation: must run on shard 0");
- }
-
- auto operation_holder = co_await get_units(_group0_operation_mutex, 1);
- co_await _raft_gr.group0().read_barrier(&_as);
-
- // Take `_group0_read_apply_mutex` *after* read barrier.
- // Read barrier may wait for `group0_state_machine::apply` which also takes this mutex.
- auto read_apply_holder = co_await get_units(_group0_read_apply_mutex, 1);
-
- auto observed_group0_state_id = co_await db::system_keyspace::get_last_group0_state_id();
- auto new_group0_state_id = generate_group0_state_id(observed_group0_state_id);
-
- co_return group0_guard {
- std::make_unique<group0_guard::impl>(
- std::move(operation_holder),
- std::move(read_apply_holder),
- observed_group0_state_id,
- new_group0_state_id
- )
- };
- }
-
- co_return group0_guard {
- std::make_unique<group0_guard::impl>(
- semaphore_units<>{},
- semaphore_units<>{},
- utils::UUID{},
- generate_group0_state_id(utils::UUID{})
- )
- };
+ return _group0_client.start_operation(&_as);
}

/**
@@ -1368,16 +1160,8 @@ future<> migration_manager::on_alive(gms::inet_address endpoint, gms::endpoint_s
return make_ready_future();
}

-void migration_manager::set_group0_history_gc_duration(gc_clock::duration d) {
- _group0_history_gc_duration = d;
-}
-
void migration_manager::set_concurrent_ddl_retries(size_t n) {
_concurrent_ddl_retries = n;
}

-semaphore& migration_manager::group0_operation_mutex() {
- return _group0_operation_mutex;
-}
-
}
diff --git a/service/raft/group0_state_machine.cc b/service/raft/group0_state_machine.cc
index 02340d6d4f..81af44fbe6 100644
--- a/service/raft/group0_state_machine.cc
+++ b/service/raft/group0_state_machine.cc
@@ -26,6 +26,7 @@
#include "service/migration_manager.hh"
#include "db/system_keyspace.hh"
#include "service/storage_proxy.hh"
+#include "service/raft/raft_group0_client.hh"

namespace service {

@@ -57,7 +58,7 @@ future<> group0_state_machine::apply(std::vector<raft::command_cref> command) {
cmd.prev_state_id, cmd.new_state_id, cmd.creator_addr, cmd.creator_id);
slogger.trace("cmd.history_append: {}", cmd.history_append);

- auto read_apply_mutex_holder = co_await get_units(_mm._group0_read_apply_mutex, 1);
+ auto read_apply_mutex_holder = co_await get_units(_client._read_apply_mutex, 1);

if (cmd.prev_state_id) {
auto last_group0_state_id = co_await db::system_keyspace::get_last_group0_state_id();
@@ -128,7 +129,7 @@ future<> group0_state_machine::transfer_snapshot(gms::inet_address from, raft::s

// TODO ensure atomicity of snapshot application in presence of crashes (see TODO in `apply`)

- auto read_apply_mutex_holder = co_await get_units(_mm._group0_read_apply_mutex, 1);
+ auto read_apply_mutex_holder = co_await get_units(_client._read_apply_mutex, 1);

co_await _mm.merge_schema_from(addr, std::move(*cm));

diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc
index 3f3dd26ada..b29060b60d 100644
--- a/service/raft/raft_group0.cc
+++ b/service/raft/raft_group0.cc
@@ -31,8 +31,9 @@ raft_group0::raft_group0(seastar::abort_source& abort_source,
netw::messaging_service& ms,
gms::gossiper& gs,
cql3::query_processor& qp,
- service::migration_manager& mm)
- : _abort_source(abort_source), _raft_gr(raft_gr), _ms(ms), _gossiper(gs), _qp(qp), _mm(mm)
+ service::migration_manager& mm,
+ raft_group0_client& client)
+ : _abort_source(abort_source), _raft_gr(raft_gr), _ms(ms), _gossiper(gs), _qp(qp), _mm(mm), _client(client)
{
}

@@ -54,7 +55,7 @@ raft_server_for_group raft_group0::create_server_for_group(raft::group_id gid,
raft::server_address my_addr) {

_raft_gr.address_map().set(my_addr);
- auto state_machine = std::make_unique<group0_state_machine>(_mm, _qp.proxy());
+ auto state_machine = std::make_unique<group0_state_machine>(_client, _mm, _qp.proxy());
auto rpc = std::make_unique<raft_rpc>(*state_machine, _ms, _raft_gr.address_map(), gid, my_addr.id);
// Keep a reference to a specific RPC class.
auto& rpc_ref = *rpc;
diff --git a/service/raft/raft_group0_client.cc b/service/raft/raft_group0_client.cc
new file mode 100644
index 0000000000..63e7c48ed9
--- /dev/null
+++ b/service/raft/raft_group0_client.cc
@@ -0,0 +1,265 @@
+/*
+ * Copyright (C) 2022-present ScyllaDB
+ *
+ * Modified by ScyllaDB
+ */
+
+/*
+ * SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
+ */
+
+#include <seastar/core/coroutine.hh>
+#include "raft_group0_client.hh"
+
+#include "frozen_schema.hh"
+#include "schema_mutations.hh"
+#include "serialization_visitors.hh"
+#include "serializer.hh"
+#include "idl/frozen_schema.dist.hh"
+#include "idl/uuid.dist.hh"
+#include "serializer_impl.hh"
+#include "idl/frozen_schema.dist.impl.hh"
+#include "idl/uuid.dist.impl.hh"
+#include "idl/raft_storage.dist.hh"
+#include "idl/raft_storage.dist.impl.hh"
+#include "idl/group0_state_machine.dist.hh"
+#include "idl/group0_state_machine.dist.impl.hh"
+
+
+namespace service {
+
+static logging::logger logger("group0_client");
+
+/* *** Linearizing group 0 operations ***
+ *
+ * Group 0 changes (e.g. schema changes) are performed through Raft commands, which are executing in the same order
+ * on every node, according to the order they appear in the Raft log
+ * (executing a command happens in `group0_state_machine::apply`).
+ * The commands contain mutations which modify tables that store group 0 state.
+ *
+ * However, constructing these mutations often requires reading the current state and validating the change against it.
+ * This happens outside the code which applies the commands in order and may race with it. At the moment of applying
+ * a command, the mutations stored within may be 'invalid' because a different command managed to be concurrently applied,
+ * changing the state.
+ *
+ * For example, consider the sequence of commands:
+ *
+ * C1, C2, C3.
+ *
+ * Suppose that mutations inside C2 were constructed on a node which already applied C1. Thus, when applying C2,
+ * the state of group 0 is the same as when the change was validated and its mutations were constructed.
+ *
+ * On the other hand, suppose that mutations inside C3 were also constructed on a node which applied C1, but didn't
+ * apply C2 yet. This could easily happen e.g. when C2 and C3 were constructed concurrently on two different nodes.
+ * Thus, when applying C3, the state of group 0 is different than it was when validating the change and constructing
+ * its mutations: the state consists of the changes from C1 and C2, but when C3 was created, it used the state consisting
+ * of changes from C1 (but not C2). Thus the mutations in C3 are not valid and we must not apply them.
+ *
+ * To protect ourselves from applying such 'obsolete' changes, we detect such commands during `group0_state_machine:apply`
+ * and skip their mutations.
+ *
+ * For this, group 0 state was extended with a 'history table' (system.group0_history), which stores a sequence of
+ * 'group 0 state IDs' (which are timeuuids). Each group 0 command also holds a unique state ID; if the command is successful,
+ * the ID is appended to the history table. Each command also stores a 'previous state ID'; the change described by the command
+ * is only applied when this 'previous state ID' is equal to the last state ID in the history table. If it's different,
+ * we skip the change.
+ *
+ * To perform a group 0 change the user must first read the last state ID from the history table. This happens by obtaining
+ * a `group0_guard` through `migration_manager::start_group0_operation`; the observed last state ID is stored in
+ * `_observed_group0_state_id`. `start_group0_operation` also generates a new state ID for this change and stores it in
+ * `_new_group0_state_id`. We ensure that the new state ID is greater than the observed state ID (in timeuuid order).
+ *
+ * The user then reads group 0 state, validates the change against the observed state, and constructs the mutations
+ * which modify group 0 state. Finally, the user calls `announce`, passing the mutations and the guard.
+ *
+ * `announce` constructs a command for the group 0 state machine. The command stores the mutations and the state IDs.
+ *
+ * When the command is applied, we compare the stored observed state ID against the last state ID in the history table.
+ * If it's the same, that means no change happened in between - no other command managed to 'sneak in' between the moment
+ * the user started the operation and the moment the command was applied.
+ *
+ * The user must use `group0_guard::write_timestamp()` when constructing the mutations. The timestamp is extracted
+ * from the new state ID. This ensures that mutations applied by successful commands have monotonic timestamps.
+ * Indeed: the state IDs of successful commands are increasing (the previous state ID of a command that is successful
+ * is equal to the new state ID of the previous successful command, and we ensure that the new state ID of a command
+ * is greater than the previous state ID of this command).
+ *
+ * To perform a linearized group 0 read the user must also obtain a `group0_guard`. This ensures that all previously
+ * completed changes are visible on this node, as obtaining the guard requires performing a Raft read barrier.
+ *
+ * Furthermore, obtaining the guard ensures that we don't read partial state, since it holds a lock that is also taken
+ * during command application (`_read_apply_mutex_holder`). The lock is released just before sending the command to Raft.
+ * TODO: we may still read partial state if we crash in the middle of command application.
+ * See `group0_state_machine::apply` for a proposed fix.
+ *
+ * Obtaining the guard also ensures that there is no concurrent group 0 operation running on this node using another lock
+ * (`_operation_mutex_holder`); if we allowed multiple concurrent operations to run, some of them could fail
+ * due to the state ID protection. Concurrent operations may still run on different nodes. This lock is thus used
+ * for improving liveness of operations running on the same node by serializing them.
+ */
+struct group0_guard::impl {
+ semaphore_units<> _operation_mutex_holder;
+ semaphore_units<> _read_apply_mutex_holder;
+
+ utils::UUID _observed_group0_state_id;
+ utils::UUID _new_group0_state_id;
+
+ impl(const impl&) = delete;
+ impl& operator=(const impl&) = delete;
+
+ impl(semaphore_units<> operation_mutex_holder, semaphore_units<> read_apply_mutex_holder, utils::UUID observed_group0_state_id, utils::UUID new_group0_state_id)
+ : _operation_mutex_holder(std::move(operation_mutex_holder)), _read_apply_mutex_holder(std::move(read_apply_mutex_holder)),
+ _observed_group0_state_id(observed_group0_state_id), _new_group0_state_id(new_group0_state_id)
+ {}
+
+ void release_read_apply_mutex() {
+ assert(_read_apply_mutex_holder.count() == 1);
+ _read_apply_mutex_holder.return_units(1);
+ }
+};
+
+group0_guard::group0_guard(std::unique_ptr<impl> p) : _impl(std::move(p)) {}
+
+group0_guard::~group0_guard() = default;
+
+group0_guard::group0_guard(group0_guard&&) noexcept = default;
+
+utils::UUID group0_guard::observed_group0_state_id() const {
+ return _impl->_observed_group0_state_id;
+}
+
+utils::UUID group0_guard::new_group0_state_id() const {
+ return _impl->_new_group0_state_id;
+}
+
+api::timestamp_type group0_guard::write_timestamp() const {
+ return utils::UUID_gen::micros_timestamp(_impl->_new_group0_state_id);
+}
+
+
+void raft_group0_client::set_history_gc_duration(gc_clock::duration d) {
+ _history_gc_duration = d;
+}
+
+semaphore& raft_group0_client::operation_mutex() {
+ return _operation_mutex;
+}
+
+future<> raft_group0_client::add_entry(group0_command group0_cmd, group0_guard guard, seastar::abort_source* as) {
+ if (this_shard_id() != 0) {
+ // This should not happen since all places which construct `group0_guard` also check that they are on shard 0.
+ // Note: `group0_guard::impl` is private to this module, making this easy to verify.
+ on_internal_error(logger, "add_entry: must run on shard 0");
+ }
+
+ auto new_group0_state_id = guard.new_group0_state_id();
+
+ co_await [&, guard = std::move(guard)] () -> future<> { // lambda is needed to limit guard's lifetime
+ raft::command cmd;
+ ser::serialize(cmd, group0_cmd);
+
+ // Release the read_apply mutex so `group0_state_machine::apply` can take it.
+ guard._impl->release_read_apply_mutex();
+
+ bool retry;
+ do {
+ retry = false;
+ try {
+ co_await _raft_gr.group0().add_entry(cmd, raft::wait_type::applied, as);
+ } catch (const raft::dropped_entry& e) {
+ logger.warn("add_entry: returned \"{}\". Retrying the command (prev_state_id: {}, new_state_id: {})",
+ e, group0_cmd.prev_state_id, group0_cmd.new_state_id);
+ retry = true;
+ } catch (const raft::commit_status_unknown& e) {
+ logger.warn("add_entry: returned \"{}\". Retrying the command (prev_state_id: {}, new_state_id: {})",
+ e, group0_cmd.prev_state_id, group0_cmd.new_state_id);
+ retry = true;
+ } catch (const raft::not_a_leader& e) {
+ // This should not happen since follower-to-leader entry forwarding is enabled in group 0.
+ // Just fail the operation by propagating the error.
+ logger.error("add_entry: unexpected `not_a_leader` error: \"{}\". Please file an issue.", e);
+ throw;
+ }
+
+ // Thanks to the `prev_state_id` check in `group0_state_machine::apply`, the command is idempotent.
+ // It's safe to retry it, even if it means it will be applied multiple times; only the first time
+ // can have an effect.
+ } while (retry);
+
+ // dropping the guard releases `_group0_operation_mutex`, allowing other operations
+ // on this node to proceed
+ } ();
+
+ if (!(co_await db::system_keyspace::group0_history_contains(new_group0_state_id))) {
+ // The command was applied but the history table does not contain the new group 0 state ID.
+ // This means `apply` skipped the change due to previous state ID mismatch.
+ throw group0_concurrent_modification{};
+ }
+}
+
+static utils::UUID generate_group0_state_id(utils::UUID prev_state_id) {
+ auto ts = api::new_timestamp();
+ if (prev_state_id != utils::UUID{}) {
+ auto lower_bound = utils::UUID_gen::micros_timestamp(prev_state_id);
+ if (ts <= lower_bound) {
+ ts = lower_bound + 1;
+ }
+ }
+ return utils::UUID_gen::get_random_time_UUID_from_micros(std::chrono::microseconds{ts});
+}
+
+future<group0_guard> raft_group0_client::start_operation(seastar::abort_source* as) {
+ if (is_enabled()) {
+ if (this_shard_id() != 0) {
+ on_internal_error(logger, "start_group0_operation: must run on shard 0");
+ }
+
+ auto operation_holder = co_await get_units(_operation_mutex, 1);
+ co_await _raft_gr.group0().read_barrier(as);
+
+ // Take `_group0_read_apply_mutex` *after* read barrier.
+ // Read barrier may wait for `group0_state_machine::apply` which also takes this mutex.
+ auto read_apply_holder = co_await get_units(_read_apply_mutex, 1);
+
+ auto observed_group0_state_id = co_await db::system_keyspace::get_last_group0_state_id();
+ auto new_group0_state_id = generate_group0_state_id(observed_group0_state_id);
+
+ co_return group0_guard {
+ std::make_unique<group0_guard::impl>(
+ std::move(operation_holder),
+ std::move(read_apply_holder),
+ observed_group0_state_id,
+ new_group0_state_id
+ )
+ };
+ }
+
+ co_return group0_guard {
+ std::make_unique<group0_guard::impl>(
+ semaphore_units<>{},
+ semaphore_units<>{},
+ utils::UUID{},
+ generate_group0_state_id(utils::UUID{})
+ )
+ };
+}
+
+group0_command raft_group0_client::prepare_command(schema_change change, group0_guard& guard, std::string_view description) {
+ group0_command group0_cmd {
+ .change{std::move(change)},
+ .history_append{db::system_keyspace::make_group0_history_state_id_mutation(
+ guard.new_group0_state_id(), _history_gc_duration, description)},
+
+ // IMPORTANT: the retry mechanism below assumes that `prev_state_id` is engaged (not nullopt).
+ // Here it is: the return type of `guard.observerd_group0_state_id()` is `utils::UUID`.
+ .prev_state_id{guard.observed_group0_state_id()},
+ .new_state_id{guard.new_group0_state_id()},
+
+ .creator_addr{utils::fb_utilities::get_broadcast_address()},
+ .creator_id{_raft_gr.group0().id()}
+ };
+
+ return group0_cmd;
+}
+
+}
diff --git a/service/storage_service.cc b/service/storage_service.cc
index 3c6228361e..1d6050e734 100644
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -1310,14 +1310,14 @@ future<> storage_service::uninit_messaging_service_part() {
return container().invoke_on_all(&service::storage_service::uninit_messaging_service);
}

-future<> storage_service::init_server(cql3::query_processor& qp) {
+future<> storage_service::init_server(cql3::query_processor& qp, raft_group0_client& client) {
assert(this_shard_id() == 0);

- return seastar::async([this, &qp] {
+ return seastar::async([this, &qp, &client] {
set_mode(mode::STARTING);

_group0 = std::make_unique<raft_group0>(_abort_source, _raft_gr, _messaging.local(),
- _gossiper, qp, _migration_manager.local());
+ _gossiper, qp, _migration_manager.local(), client);

std::unordered_set<inet_address> loaded_endpoints;
if (_db.local().get_config().load_ring_state()) {
diff --git a/test/boost/group0_test.cc b/test/boost/group0_test.cc
index b9cd2cfd82..db9ea7d7e6 100644
--- a/test/boost/group0_test.cc
+++ b/test/boost/group0_test.cc
@@ -46,8 +46,8 @@ SEASTAR_TEST_CASE(test_group0_history_clearing_old_entries) {
co_await perform_schema_change();
BOOST_REQUIRE_EQUAL(co_await get_history_size(), size + 1);

- auto& mm = e.migration_manager().local();
- mm.set_group0_history_gc_duration(gc_clock::duration{0});
+ auto& rclient = e.get_raft_group0_client();
+ rclient.set_history_gc_duration(gc_clock::duration{0});

// When group0_history_gc_duration is 0, any change should clear all previous history entries.
co_await perform_schema_change();
@@ -55,7 +55,7 @@ SEASTAR_TEST_CASE(test_group0_history_clearing_old_entries) {
co_await perform_schema_change();
BOOST_REQUIRE_EQUAL(co_await get_history_size(), 1);

- mm.set_group0_history_gc_duration(duration_cast<gc_clock::duration>(weeks{1}));
+ rclient.set_history_gc_duration(duration_cast<gc_clock::duration>(weeks{1}));
co_await perform_schema_change();
BOOST_REQUIRE_EQUAL(co_await get_history_size(), 2);

@@ -82,7 +82,7 @@ SEASTAR_TEST_CASE(test_group0_history_clearing_old_entries) {
};

auto timestamps1 = co_await get_history_timestamps();
- mm.set_group0_history_gc_duration(duration_cast<gc_clock::duration>(sleep_dur));
+ rclient.set_history_gc_duration(duration_cast<gc_clock::duration>(sleep_dur));
co_await perform_schema_change();
auto timestamps2 = co_await get_history_timestamps();
// State IDs are sorted in descending order in the history table.
@@ -110,13 +110,14 @@ SEASTAR_TEST_CASE(test_group0_history_clearing_old_entries) {

SEASTAR_TEST_CASE(test_concurrent_group0_modifications) {
return do_with_cql_env([] (cql_test_env& e) -> future<> {
+ auto& rclient = e.get_raft_group0_client();
auto& mm = e.migration_manager().local();

- // migration_manager::_group0_operation_mutex prevents concurrent group 0 changes to be executed on a single node,
+ // raft_group0_client::_group0_operation_mutex prevents concurrent group 0 changes to be executed on a single node,
// so in production `group0_concurrent_modification` never occurs if all changes go through a single node.
// For this test, give it more units so it doesn't block these concurrent executions
// in order to simulate a scenario where multiple nodes concurrently send schema changes.
- mm.group0_operation_mutex().signal(1337);
+ rclient.operation_mutex().signal(1337);

// Make DDL statement execution fail on the first attempt if it gets a concurrent modification exception.
mm.set_concurrent_ddl_retries(0);
@@ -178,7 +179,7 @@ SEASTAR_TEST_CASE(test_concurrent_group0_modifications) {
BOOST_REQUIRE_EQUAL(successes, N*M);

// Let's verify that the mutex indeed does its job.
- mm.group0_operation_mutex().consume(1337);
+ rclient.operation_mutex().consume(1337);
mm.set_concurrent_ddl_retries(0);

successes = co_await map_reduce(boost::irange(2*N, 3*N), std::bind_front(perform_schema_changes, std::ref(e), M), 0, std::plus{});
diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc
index 3a8e828cfd..3ac60c2523 100644
--- a/test/lib/cql_test_env.cc
+++ b/test/lib/cql_test_env.cc
@@ -61,6 +61,7 @@
#include "streaming/stream_manager.hh"
#include "debug.hh"
#include "db/schema_tables.hh"
+#include "service/raft/raft_group0_client.hh"

#include <sys/time.h>
#include <sys/resource.h>
@@ -130,6 +131,8 @@ class single_node_cql_env : public cql_test_env {
sharded<qos::service_level_controller>& _sl_controller;
sharded<service::migration_manager>& _mm;
sharded<db::batchlog_manager>& _batchlog_manager;
+ service::raft_group0_client& _group0_client;
+
private:
struct core_local_state {
service::client_state client_state;
@@ -178,7 +181,8 @@ class single_node_cql_env : public cql_test_env {
sharded<service::migration_notifier>& mnotifier,
sharded<service::migration_manager>& mm,
sharded<qos::service_level_controller> &sl_controller,
- sharded<db::batchlog_manager>& batchlog_manager)
+ sharded<db::batchlog_manager>& batchlog_manager,
+ service::raft_group0_client& client)
: _db(db)
, _qp(qp)
, _auth_service(auth_service)
@@ -188,6 +192,7 @@ class single_node_cql_env : public cql_test_env {
, _sl_controller(sl_controller)
, _mm(mm)
, _batchlog_manager(batchlog_manager)
+ , _group0_client(client)
{
adjust_rlimit();
}
@@ -399,6 +404,10 @@ class single_node_cql_env : public cql_test_env {
return _batchlog_manager;
}

+ virtual service::raft_group0_client& get_raft_group0_client() override {
+ return _group0_client;
+ }
+
virtual future<> refresh_client_state() override {
return _core_local.invoke_on_all([] (core_local_state& state) {
return state.client_state.maybe_update_per_service_level_params();
@@ -646,7 +655,10 @@ class single_node_cql_env : public cql_test_env {
forward_service.start(std::ref(ms), std::ref(proxy), std::ref(db), std::ref(token_metadata)).get();
auto stop_forward_service = defer([&forward_service] { forward_service.stop().get(); });

- mm.start(std::ref(mm_notif), std::ref(feature_service), std::ref(ms), std::ref(proxy), std::ref(gossiper), std::ref(raft_gr), std::ref(sys_ks)).get();
+ // gropu0 client exists only on shard 0
+ service::raft_group0_client group0_client(raft_gr.local());
+
+ mm.start(std::ref(mm_notif), std::ref(feature_service), std::ref(ms), std::ref(proxy), std::ref(gossiper), std::ref(group0_client), std::ref(sys_ks)).get();
auto stop_mm = defer([&mm] { mm.stop().get(); });

cql3::query_processor::memory_config qp_mcfg = {memory::stats().total_memory() / 256, memory::stats().total_memory() / 2560};
@@ -745,7 +757,7 @@ class single_node_cql_env : public cql_test_env {
cdc.stop().get();
});

- ss.local().init_server(qp.local()).get();
+ ss.local().init_server(qp.local(), group0_client).get();
try {
ss.local().join_cluster().get();
} catch (std::exception& e) {
@@ -803,7 +815,7 @@ class single_node_cql_env : public cql_test_env {
// The default user may already exist if this `cql_test_env` is starting with previously populated data.
}

- single_node_cql_env env(db, qp, auth_service, view_builder, view_update_generator, mm_notif, mm, std::ref(sl_controller), bm);
+ single_node_cql_env env(db, qp, auth_service, view_builder, view_update_generator, mm_notif, mm, std::ref(sl_controller), bm, group0_client);
env.start().get();
auto stop_env = defer([&env] { env.stop().get(); });

--
Gleb.

Avi Kivity

<avi@scylladb.com>
unread,
May 15, 2022, 10:09:39 AM5/15/22
to Gleb Natapov, scylladb-dev@googlegroups.com
Why is Raft licensed under Apache 2?



Gleb Natapov

<gleb@scylladb.com>
unread,
May 15, 2022, 10:16:37 AM5/15/22
to Avi Kivity, scylladb-dev@googlegroups.com
On Sun, May 15, 2022 at 05:09:36PM +0300, Avi Kivity wrote:
> > diff --git a/service/raft/raft_group0_client.hh b/service/raft/raft_group0_client.hh
> > new file mode 100644
> > index 0000000000..7983362d58
> > --- /dev/null
> > +++ b/service/raft/raft_group0_client.hh
> > @@ -0,0 +1,79 @@
> > +/*
> > + * Copyright (C) 2022-present ScyllaDB
> > + *
> > + * Modified by ScyllaDB
> > + */
> > +
> > +/*
> > + * SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
> > + */
>
>
> Why is Raft licensed under Apache 2?
>
This file is not part of the raft. It is a raft client from Scylla. It
still should not have Apache-2.0 here since it does not contain
Cassandra code. C&P from a wrong place.

BTW why service/memory_limiter.hh is Apache-2.0 only?

--
Gleb.

Avi Kivity

<avi@scylladb.com>
unread,
May 15, 2022, 10:32:04 AM5/15/22
to Gleb Natapov, scylladb-dev@googlegroups.com
Looks like an accident.


Gleb Natapov

<gleb@scylladb.com>
unread,
May 16, 2022, 8:50:27 AM5/16/22
to scylladb-dev@googlegroups.com
Writing into the group0 raft group on a client side involves locking
the state machine, choosing a state id and checking for its presence
after operation completes. The code that does it resides now in the
migration manager since the currently it is the only user of group0. In
the near future we will have more client for group0 and they all will
have to have the same logic, so the patch moves it to a separate class
raft_group0_client that any future user of group0 can use to write
into it.

---
v1->v2:
- fix license

CI: https://jenkins.scylladb.com/job/releng/job/Scylla-CI/373/
diff --git a/service/raft/raft_group0_client.hh b/service/raft/raft_group0_client.hh
new file mode 100644
index 0000000000..5f8a59f6a8
--- /dev/null
+++ b/service/raft/raft_group0_client.hh
@@ -0,0 +1,79 @@
+/*
+ * Copyright (C) 2022-present ScyllaDB
+ *
+ * Modified by ScyllaDB
+ */
+
+/*
+ * SPDX-License-Identifier: AGPL-3.0-or-later
#include "service/raft/raft_group_registry.hh"
+#include "service/raft/raft_group0_client.hh"
@@ -26,6 +26,7 @@
#include "service/migration_manager.hh"
#include "db/system_keyspace.hh"
#include "service/storage_proxy.hh"
+#include "service/raft/raft_group0_client.hh"

namespace service {

@@ -57,7 +58,7 @@ future<> group0_state_machine::apply(std::vector<raft::command_cref> command) {
cmd.prev_state_id, cmd.new_state_id, cmd.creator_addr, cmd.creator_id);
slogger.trace("cmd.history_append: {}", cmd.history_append);

- auto read_apply_mutex_holder = co_await get_units(_mm._group0_read_apply_mutex, 1);
+ auto read_apply_mutex_holder = co_await get_units(_client._read_apply_mutex, 1);

if (cmd.prev_state_id) {
auto last_group0_state_id = co_await db::system_keyspace::get_last_group0_state_id();
@@ -128,7 +129,7 @@ future<> group0_state_machine::transfer_snapshot(gms::inet_address from, raft::s

// TODO ensure atomicity of snapshot application in presence of crashes (see TODO in `apply`)

- auto read_apply_mutex_holder = co_await get_units(_mm._group0_read_apply_mutex, 1);
+ auto read_apply_mutex_holder = co_await get_units(_client._read_apply_mutex, 1);

co_await _mm.merge_schema_from(addr, std::move(*cm));

diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc
index 3f3dd26ada..b29060b60d 100644
--- a/service/raft/raft_group0.cc
+++ b/service/raft/raft_group0.cc
@@ -31,8 +31,9 @@ raft_group0::raft_group0(seastar::abort_source& abort_source,
netw::messaging_service& ms,
gms::gossiper& gs,
cql3::query_processor& qp,
- service::migration_manager& mm)
- : _abort_source(abort_source), _raft_gr(raft_gr), _ms(ms), _gossiper(gs), _qp(qp), _mm(mm)
+ service::migration_manager& mm,
+ raft_group0_client& client)
+ : _abort_source(abort_source), _raft_gr(raft_gr), _ms(ms), _gossiper(gs), _qp(qp), _mm(mm), _client(client)
{
}

@@ -54,7 +55,7 @@ raft_server_for_group raft_group0::create_server_for_group(raft::group_id gid,
raft::server_address my_addr) {

_raft_gr.address_map().set(my_addr);
- auto state_machine = std::make_unique<group0_state_machine>(_mm, _qp.proxy());
+ auto state_machine = std::make_unique<group0_state_machine>(_client, _mm, _qp.proxy());
auto rpc = std::make_unique<raft_rpc>(*state_machine, _ms, _raft_gr.address_map(), gid, my_addr.id);
// Keep a reference to a specific RPC class.
auto& rpc_ref = *rpc;
diff --git a/service/raft/raft_group0_client.cc b/service/raft/raft_group0_client.cc
new file mode 100644
index 0000000000..8551e4877a
--- /dev/null
+++ b/service/raft/raft_group0_client.cc
@@ -0,0 +1,265 @@
+/*
+ * Copyright (C) 2022-present ScyllaDB
+ *
+ * Modified by ScyllaDB
+ */
+
+/*
+ * SPDX-License-Identifier: AGPL-3.0-or-later

Gleb Natapov

<gleb@scylladb.com>
unread,
May 18, 2022, 5:41:23 AM5/18/22
to scylladb-dev@googlegroups.com
Ping.
--
Gleb.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
May 19, 2022, 2:07:32 AM5/19/22
to Gleb Natapov, scylladb-dev@googlegroups.com
The patch doesn't apply to current next (to master either)

Gleb Natapov

<gleb@scylladb.com>
unread,
May 19, 2022, 4:24:52 AM5/19/22
to Pavel Emelyanov, scylladb-dev@googlegroups.com
On Thu, May 19, 2022 at 09:07:27AM +0300, Pavel Emelyanov wrote:
> The patch doesn't apply to current next (to master either)
>
Conflicts with Kamil's failure detector :( Will rebase.
--
Gleb.

Gleb Natapov

<gleb@scylladb.com>
unread,
May 19, 2022, 4:30:37 AM5/19/22
to scylladb-dev@googlegroups.com
Writing into the group0 raft group on a client side involves locking
the state machine, choosing a state id and checking for its presence
after operation completes. The code that does it resides now in the
migration manager since the currently it is the only user of group0. In
the near future we will have more client for group0 and they all will
have to have the same logic, so the patch moves it to a separate class
raft_group0_client that any future user of group0 can use to write
into it.

---
v2->v3:
- rebase

v1->v2:
- fix license

CI: https://jenkins.scylladb.com/job/releng/job/Scylla-CI/418/

diff --git a/configure.py b/configure.py
index 70073b1987..023a0bc338 100755
--- a/configure.py
+++ b/configure.py
@@ -1020,6 +1020,7 @@ scylla_core = (['replica/database.cc',
'service/raft/discovery.cc',
'service/raft/raft_group0.cc',
'direct_failure_detector/failure_detector.cc',
+ 'service/raft/raft_group0_client.cc',
] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] \
+ scylla_raft_core
)
diff --git a/service/migration_manager.hh b/service/migration_manager.hh
index 353b0912ca..8f594cef6a 100644
--- a/service/migration_manager.hh
+++ b/service/migration_manager.hh
@@ -23,6 +23,7 @@
#include "utils/UUID.hh"
#include "utils/serialized_action.hh"
#include "service/raft/raft_group_registry.hh"
+#include "service/raft/raft_group0_client.hh"

#include <vector>

@@ -52,34 +53,6 @@ class storage_proxy;
@@ -95,21 +68,15 @@ class migration_manager : public seastar::async_sharded_service<migration_manage
service::storage_proxy& _storage_proxy;
gms::gossiper& _gossiper;
seastar::abort_source _as;
- service::raft_group_registry& _raft_gr;
+ service::raft_group0_client& _group0_client;
sharded<db::system_keyspace>& _sys_ks;
serialized_action _schema_push;
utils::UUID _schema_version_to_publish;

- friend class group0_state_machine;
- // See `group0_guard::impl` for explanation of the purpose of these locks.
- semaphore _group0_read_apply_mutex;
- semaphore _group0_operation_mutex;
-
- gc_clock::duration _group0_history_gc_duration;
-
+ friend class group0_state_machine; // needed for access to _messaging
size_t _concurrent_ddl_retries;
public:
- migration_manager(migration_notifier&, gms::feature_service&, netw::messaging_service& ms, service::storage_proxy&, gms::gossiper& gossiper, service::raft_group_registry& raft_gr, sharded<db::system_keyspace>& sysks);
+ migration_manager(migration_notifier&, gms::feature_service&, netw::messaging_service& ms, service::storage_proxy&, gms::gossiper& gossiper, service::raft_group0_client& group0_client, sharded<db::system_keyspace>& sysks);

migration_notifier& get_notifier() { return _notifier; }
const migration_notifier& get_notifier() const { return _notifier; }
@@ -190,7 +157,7 @@ class migration_manager : public seastar::async_sharded_service<migration_manage
future<group0_guard> start_group0_operation();

// used to check if raft is enabled on the cluster
- bool is_raft_enabled() { return _raft_gr.is_enabled(); }
+ bool is_raft_enabled() { return _group0_client.is_enabled(); }

// Apply a group 0 change.
// The future resolves after the change is applied locally.
@@ -251,14 +218,8 @@ class migration_manager : public seastar::async_sharded_service<migration_manage
index 35124fcba3..22205a95f3 100644
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -88,6 +88,7 @@ namespace service {
class storage_service;
class migration_manager;
class raft_group0;
+class raft_group0_client;

enum class disk_error { regular, commit };

@@ -371,7 +372,7 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
*
* \see init_messaging_service_part
*/
- future<> init_server(cql3::query_processor& qp);
+ future<> init_server(cql3::query_processor& qp, raft_group0_client& client);

future<> join_cluster();

diff --git a/test/lib/cql_test_env.hh b/test/lib/cql_test_env.hh
index ab96835899..debce81ad0 100644
--- a/test/lib/cql_test_env.hh
+++ b/test/lib/cql_test_env.hh
@@ -50,6 +50,7 @@ namespace service {

class client_state;
class migration_manager;
+class raft_group0_client;

}

@@ -163,6 +164,8 @@ class cql_test_env {

virtual future<> refresh_client_state() = 0;

+ virtual service::raft_group0_client& get_raft_group0_client() = 0;
+
data_dictionary::database data_dictionary();
};

diff --git a/main.cc b/main.cc
index ac6d6e8347..b2233ca85b 100644
--- a/main.cc
+++ b/main.cc
@@ -88,6 +88,7 @@
#include "tools/entry_point.hh"

#include "service/raft/raft_group_registry.hh"
+#include "service/raft/raft_group0_client.hh"

#include <boost/algorithm/string/join.hpp>

@@ -1005,11 +1006,15 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
auto stop_forward_service_handlers = defer_verbose_shutdown("forward service", [&forward_service] {
forward_service.stop().get();
});
+
+ // gropu0 client exists only on shard 0
+ service::raft_group0_client group0_client(raft_gr.local());
+
// #293 - do not stop anything
// engine().at_exit([&proxy] { return proxy.stop(); });
supervisor::notify("starting migration manager");
debug::the_migration_manager = &mm;
- mm.start(std::ref(mm_notifier), std::ref(feature_service), std::ref(messaging), std::ref(proxy), std::ref(gossiper), std::ref(raft_gr), std::ref(sys_ks)).get();
+ mm.start(std::ref(mm_notifier), std::ref(feature_service), std::ref(messaging), std::ref(proxy), std::ref(gossiper), std::ref(group0_client), std::ref(sys_ks)).get();
auto stop_migration_manager = defer_verbose_shutdown("migration manager", [&mm] {
mm.stop().get();
});
@@ -1268,7 +1273,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
}).get();

with_scheduling_group(maintenance_scheduling_group, [&] {
- return ss.local().init_server(qp.local());
+ return ss.local().init_server(qp.local(), group0_client);
}).get();

// Raft group0 can be joined before we wait for gossip to settle
diff --git a/service/migration_manager.cc b/service/migration_manager.cc
index d8a6f105d8..17118e139d 100644
--- a/service/migration_manager.cc
+++ b/service/migration_manager.cc
@@ -37,11 +37,6 @@
#include "serializer_impl.hh"
#include "idl/frozen_schema.dist.impl.hh"
#include "idl/uuid.dist.impl.hh"
-#include "idl/raft_storage.dist.hh"
-#include "idl/raft_storage.dist.impl.hh"
-#include "idl/group0_state_machine.dist.hh"
-#include "idl/group0_state_machine.dist.impl.hh"
-

namespace service {

@@ -53,12 +48,10 @@ const std::chrono::milliseconds migration_manager::migration_delay = 60000ms;
static future<schema_ptr> get_schema_definition(table_schema_version v, netw::messaging_service::msg_addr dst, netw::messaging_service& ms, service::storage_proxy& sp);

migration_manager::migration_manager(migration_notifier& notifier, gms::feature_service& feat, netw::messaging_service& ms,
- service::storage_proxy& storage_proxy, gms::gossiper& gossiper, service::raft_group_registry& raft_gr, sharded<db::system_keyspace>& sysks) :
- _notifier(notifier), _feat(feat), _messaging(ms), _storage_proxy(storage_proxy), _gossiper(gossiper), _raft_gr(raft_gr)
+ service::storage_proxy& storage_proxy, gms::gossiper& gossiper, service::raft_group0_client& group0_client, sharded<db::system_keyspace>& sysks) :
+ _notifier(notifier), _feat(feat), _messaging(ms), _storage_proxy(storage_proxy), _gossiper(gossiper), _group0_client(group0_client)
, _sys_ks(sysks)
, _schema_push([this] { return passive_announce(); })
- , _group0_read_apply_mutex{1}, _group0_operation_mutex{1}
- , _group0_history_gc_duration{std::chrono::duration_cast<gc_clock::duration>(std::chrono::weeks{1})}
, _concurrent_ddl_retries{10}
{
}
@@ -138,7 +131,7 @@ void migration_manager::init_messaging_service()
auto features = self._feat.cluster_schema_features();
auto& proxy = self._storage_proxy.container();
auto cm = co_await db::schema_tables::convert_schema_to_mutations(proxy, features);
- if (self._raft_gr.is_enabled() && options->group0_snapshot_transfer) {
+ if (self.is_raft_enabled() && options->group0_snapshot_transfer) {
// if `group0_snapshot_transfer` is `true`, the sender must also understand canonical mutations
// (`group0_snapshot_transfer` was added more recently).
if (!cm_retval_supported) {
@@ -890,166 +883,18 @@ future<> migration_manager::push_schema_mutation(const gms::inet_address& endpoi
@@ -1075,71 +920,18 @@ future<> migration_manager::announce_without_raft(std::vector<mutation> schema)
@@ -1366,16 +1158,8 @@ future<> migration_manager::on_alive(gms::inet_address endpoint, gms::endpoint_s
index ab07702af1..b9115ea5cb 100644
--- a/service/raft/raft_group0.cc
+++ b/service/raft/raft_group0.cc
@@ -31,8 +31,9 @@ raft_group0::raft_group0(seastar::abort_source& abort_source,
netw::messaging_service& ms,
gms::gossiper& gs,
cql3::query_processor& qp,
- service::migration_manager& mm)
- : _abort_source(abort_source), _raft_gr(raft_gr), _ms(ms), _gossiper(gs), _qp(qp), _mm(mm)
+ service::migration_manager& mm,
+ raft_group0_client& client)
+ : _abort_source(abort_source), _raft_gr(raft_gr), _ms(ms), _gossiper(gs), _qp(qp), _mm(mm), _client(client)
{
}

@@ -54,7 +55,7 @@ raft_server_for_group raft_group0::create_server_for_group(raft::group_id gid,
raft::server_address my_addr) {

_raft_gr.address_map().set(my_addr);
- auto state_machine = std::make_unique<group0_state_machine>(_mm, _qp.proxy());
+ auto state_machine = std::make_unique<group0_state_machine>(_client, _mm, _qp.proxy());
auto rpc = std::make_unique<raft_rpc>(*state_machine, _ms, _raft_gr.address_map(), gid, my_addr.id,
[this] (gms::inet_address addr, bool added) {
// FIXME: we should eventually switch to UUID-based (not IP-based) node identification/communication scheme.
index 37be0da768..b2a04b7c3c 100644
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -1304,14 +1304,14 @@ future<> storage_service::uninit_messaging_service_part() {
index 9473e04b1e..bc688e2170 100644
--- a/test/lib/cql_test_env.cc
+++ b/test/lib/cql_test_env.cc
@@ -61,6 +61,7 @@
#include "streaming/stream_manager.hh"
#include "debug.hh"
#include "db/schema_tables.hh"
+#include "service/raft/raft_group0_client.hh"

#include <sys/time.h>
#include <sys/resource.h>
@@ -131,6 +132,8 @@ class single_node_cql_env : public cql_test_env {
sharded<service::migration_manager>& _mm;
sharded<db::batchlog_manager>& _batchlog_manager;
sharded<gms::gossiper>& _gossiper;
+ service::raft_group0_client& _group0_client;
+
private:
struct core_local_state {
service::client_state client_state;
@@ -180,7 +183,8 @@ class single_node_cql_env : public cql_test_env {
sharded<service::migration_manager>& mm,
sharded<qos::service_level_controller> &sl_controller,
sharded<db::batchlog_manager>& batchlog_manager,
- sharded<gms::gossiper>& gossiper)
+ sharded<gms::gossiper>& gossiper,
+ service::raft_group0_client& client)
: _db(db)
, _qp(qp)
, _auth_service(auth_service)
@@ -191,6 +195,7 @@ class single_node_cql_env : public cql_test_env {
, _mm(mm)
, _batchlog_manager(batchlog_manager)
, _gossiper(gossiper)
+ , _group0_client(client)
{
adjust_rlimit();
}
@@ -406,6 +411,10 @@ class single_node_cql_env : public cql_test_env {
return _gossiper;
}

+ virtual service::raft_group0_client& get_raft_group0_client() override {
+ return _group0_client;
+ }
+
virtual future<> refresh_client_state() override {
return _core_local.invoke_on_all([] (core_local_state& state) {
return state.client_state.maybe_update_per_service_level_params();
@@ -665,7 +674,10 @@ class single_node_cql_env : public cql_test_env {
forward_service.start(std::ref(ms), std::ref(proxy), std::ref(db), std::ref(token_metadata)).get();
auto stop_forward_service = defer([&forward_service] { forward_service.stop().get(); });

- mm.start(std::ref(mm_notif), std::ref(feature_service), std::ref(ms), std::ref(proxy), std::ref(gossiper), std::ref(raft_gr), std::ref(sys_ks)).get();
+ // gropu0 client exists only on shard 0
+ service::raft_group0_client group0_client(raft_gr.local());
+
+ mm.start(std::ref(mm_notif), std::ref(feature_service), std::ref(ms), std::ref(proxy), std::ref(gossiper), std::ref(group0_client), std::ref(sys_ks)).get();
auto stop_mm = defer([&mm] { mm.stop().get(); });

cql3::query_processor::memory_config qp_mcfg;
@@ -769,7 +781,7 @@ class single_node_cql_env : public cql_test_env {
cdc.stop().get();
});

- ss.local().init_server(qp.local()).get();
+ ss.local().init_server(qp.local(), group0_client).get();
try {
ss.local().join_cluster().get();
} catch (std::exception& e) {
@@ -827,7 +839,7 @@ class single_node_cql_env : public cql_test_env {
// The default user may already exist if this `cql_test_env` is starting with previously populated data.
}

- single_node_cql_env env(db, qp, auth_service, view_builder, view_update_generator, mm_notif, mm, std::ref(sl_controller), bm, gossiper);
+ single_node_cql_env env(db, qp, auth_service, view_builder, view_update_generator, mm_notif, mm, std::ref(sl_controller), bm, gossiper, group0_client);

Commit Bot

<bot@cloudius-systems.com>
unread,
May 19, 2022, 5:41:32 AM5/19/22
to scylladb-dev@googlegroups.com, Gleb Natapov
From: Gleb Natapov <gl...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: next

service: raft: move group0 write path into a separate file

Writing into the group0 raft group on a client side involves locking
the state machine, choosing a state id and checking for its presence
after operation completes. The code that does it resides now in the
migration manager since the currently it is the only user of group0. In
the near future we will have more client for group0 and they all will
have to have the same logic, so the patch moves it to a separate class
raft_group0_client that any future user of group0 can use to write
into it.

Message-Id: <YoYAJwdT...@scylladb.com>

---
diff --git a/configure.py b/configure.py
--- a/configure.py
+++ b/configure.py
@@ -1020,6 +1020,7 @@ def find_headers(repodir, excluded_dirs):
'service/raft/discovery.cc',
'service/raft/raft_group0.cc',
'direct_failure_detector/failure_detector.cc',
+ 'service/raft/raft_group0_client.cc',
] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] \
+ scylla_raft_core
)
diff --git a/main.cc b/main.cc
diff --git a/service/migration_manager.hh b/service/migration_manager.hh
@@ -95,21 +68,15 @@ private:
service::storage_proxy& _storage_proxy;
gms::gossiper& _gossiper;
seastar::abort_source _as;
- service::raft_group_registry& _raft_gr;
+ service::raft_group0_client& _group0_client;
sharded<db::system_keyspace>& _sys_ks;
serialized_action _schema_push;
utils::UUID _schema_version_to_publish;

- friend class group0_state_machine;
- // See `group0_guard::impl` for explanation of the purpose of these locks.
- semaphore _group0_read_apply_mutex;
- semaphore _group0_operation_mutex;
-
- gc_clock::duration _group0_history_gc_duration;
-
+ friend class group0_state_machine; // needed for access to _messaging
size_t _concurrent_ddl_retries;
public:
- migration_manager(migration_notifier&, gms::feature_service&, netw::messaging_service& ms, service::storage_proxy&, gms::gossiper& gossiper, service::raft_group_registry& raft_gr, sharded<db::system_keyspace>& sysks);
+ migration_manager(migration_notifier&, gms::feature_service&, netw::messaging_service& ms, service::storage_proxy&, gms::gossiper& gossiper, service::raft_group0_client& group0_client, sharded<db::system_keyspace>& sysks);

migration_notifier& get_notifier() { return _notifier; }
const migration_notifier& get_notifier() const { return _notifier; }
@@ -190,7 +157,7 @@ public:
future<group0_guard> start_group0_operation();

// used to check if raft is enabled on the cluster
- bool is_raft_enabled() { return _raft_gr.is_enabled(); }
+ bool is_raft_enabled() { return _group0_client.is_enabled(); }

// Apply a group 0 change.
// The future resolves after the change is applied locally.
@@ -251,14 +218,8 @@ private:
virtual future<> before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_statekey, const gms::versioned_value& newvalue) override { return make_ready_future(); }

public:
- // For tests only.
- void set_group0_history_gc_duration(gc_clock::duration);
-
// For tests only.
void set_concurrent_ddl_retries(size_t);
-
- // For tests only.
- semaphore& group0_operation_mutex();
};

future<column_mapping> get_column_mapping(utils::UUID table_id, table_schema_version v);
diff --git a/service/raft/group0_state_machine.cc b/service/raft/group0_state_machine.cc
--- a/service/raft/group0_state_machine.cc
+++ b/service/raft/group0_state_machine.cc
@@ -26,6 +26,7 @@
#include "service/migration_manager.hh"
#include "db/system_keyspace.hh"
#include "service/storage_proxy.hh"
+#include "service/raft/raft_group0_client.hh"

namespace service {

@@ -57,7 +58,7 @@ future<> group0_state_machine::apply(std::vector<raft::command_cref> command) {
cmd.prev_state_id, cmd.new_state_id, cmd.creator_addr, cmd.creator_id);
slogger.trace("cmd.history_append: {}", cmd.history_append);

- auto read_apply_mutex_holder = co_await get_units(_mm._group0_read_apply_mutex, 1);
+ auto read_apply_mutex_holder = co_await get_units(_client._read_apply_mutex, 1);

if (cmd.prev_state_id) {
auto last_group0_state_id = co_await db::system_keyspace::get_last_group0_state_id();
@@ -128,7 +129,7 @@ future<> group0_state_machine::transfer_snapshot(gms::inet_address from, raft::s

// TODO ensure atomicity of snapshot application in presence of crashes (see TODO in `apply`)

- auto read_apply_mutex_holder = co_await get_units(_mm._group0_read_apply_mutex, 1);
+ auto read_apply_mutex_holder = co_await get_units(_client._read_apply_mutex, 1);

co_await _mm.merge_schema_from(addr, std::move(*cm));

diff --git a/service/raft/group0_state_machine.hh b/service/raft/group0_state_machine.hh
--- a/service/raft/group0_state_machine.hh
+++ b/service/raft/group0_state_machine.hh
@@ -13,6 +13,7 @@
#include "service/raft/raft_state_machine.hh"

namespace service {
+class raft_group0_client;
class migration_manager;
class storage_proxy;

@@ -58,10 +59,11 @@ struct group0_command {
// Raft state machine implementation for managing group 0 changes (e.g. schema changes).
// NOTE: group 0 raft server is always instantiated on shard 0.
class group0_state_machine : public raft_state_machine {
+ raft_group0_client& _client;
migration_manager& _mm;
storage_proxy& _sp;
public:
- group0_state_machine(migration_manager& mm, storage_proxy& sp) : _mm(mm), _sp(sp) {}
+ group0_state_machine(raft_group0_client& client, migration_manager& mm, storage_proxy& sp) : _client(client), _mm(mm), _sp(sp) {}
future<> apply(std::vector<raft::command_cref> command) override;
future<raft::snapshot_id> take_snapshot() override;
void drop_snapshot(raft::snapshot_id id) override;
diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc
--- a/service/raft/raft_group0.cc
+++ b/service/raft/raft_group0.cc
@@ -31,8 +31,9 @@ raft_group0::raft_group0(seastar::abort_source& abort_source,
netw::messaging_service& ms,
gms::gossiper& gs,
cql3::query_processor& qp,
- service::migration_manager& mm)
- : _abort_source(abort_source), _raft_gr(raft_gr), _ms(ms), _gossiper(gs), _qp(qp), _mm(mm)
+ service::migration_manager& mm,
+ raft_group0_client& client)
+ : _abort_source(abort_source), _raft_gr(raft_gr), _ms(ms), _gossiper(gs), _qp(qp), _mm(mm), _client(client)
{
}

@@ -54,7 +55,7 @@ raft_server_for_group raft_group0::create_server_for_group(raft::group_id gid,
raft::server_address my_addr) {

_raft_gr.address_map().set(my_addr);
- auto state_machine = std::make_unique<group0_state_machine>(_mm, _qp.proxy());
+ auto state_machine = std::make_unique<group0_state_machine>(_client, _mm, _qp.proxy());
auto rpc = std::make_unique<raft_rpc>(*state_machine, _ms, _raft_gr.address_map(), gid, my_addr.id,
[this] (gms::inet_address addr, bool added) {
// FIXME: we should eventually switch to UUID-based (not IP-based) node identification/communication scheme.
diff --git a/service/raft/raft_group0.hh b/service/raft/raft_group0.hh
--- a/service/raft/raft_group0.hh
+++ b/service/raft/raft_group0.hh
@@ -17,6 +17,7 @@ namespace gms { class gossiper; }
namespace service {

class migration_manager;
+class raft_group0_client;

// Wrapper for `discovery` which persists the learned peers on disk.
class persistent_discovery {
@@ -65,6 +66,8 @@ public:
gms::gossiper& _gossiper;
cql3::query_processor& _qp;
service::migration_manager& _mm;
+ raft_group0_client& _client;
+
// Status of leader discovery. Initially there is no group 0,
// and the variant contains no state. During initial cluster
// bootstrap a discovery object is created, which is then
@@ -86,7 +89,8 @@ public:
netw::messaging_service& ms,
gms::gossiper& gs,
cql3::query_processor& qp,
- migration_manager& mm);
+ migration_manager& mm,
+ raft_group0_client& client);

future<> abort() {
if (!_abort_source.abort_requested()) {
diff --git a/service/raft/raft_group0_client.cc b/service/raft/raft_group0_client.cc
--- a/service/raft/raft_group0_client.cc
diff --git a/service/raft/raft_group0_client.hh b/service/raft/raft_group0_client.hh
--- a/service/raft/raft_group0_client.hh
diff --git a/service/storage_service.cc b/service/storage_service.cc
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -1304,14 +1304,14 @@ future<> storage_service::uninit_messaging_service_part() {
return container().invoke_on_all(&service::storage_service::uninit_messaging_service);
}

-future<> storage_service::init_server(cql3::query_processor& qp) {
+future<> storage_service::init_server(cql3::query_processor& qp, raft_group0_client& client) {
assert(this_shard_id() == 0);

- return seastar::async([this, &qp] {
+ return seastar::async([this, &qp, &client] {
set_mode(mode::STARTING);

_group0 = std::make_unique<raft_group0>(_abort_source, _raft_gr, _messaging.local(),
- _gossiper, qp, _migration_manager.local());
+ _gossiper, qp, _migration_manager.local(), client);

std::unordered_set<inet_address> loaded_endpoints;
if (_db.local().get_config().load_ring_state()) {
diff --git a/service/storage_service.hh b/service/storage_service.hh
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -88,6 +88,7 @@ namespace service {
class storage_service;
class migration_manager;
class raft_group0;
+class raft_group0_client;

enum class disk_error { regular, commit };

@@ -371,7 +372,7 @@ public:
*
* \see init_messaging_service_part
*/
- future<> init_server(cql3::query_processor& qp);
+ future<> init_server(cql3::query_processor& qp, raft_group0_client& client);

future<> join_cluster();

diff --git a/test/boost/group0_test.cc b/test/boost/group0_test.cc
--- a/test/boost/group0_test.cc
+++ b/test/boost/group0_test.cc
@@ -46,16 +46,16 @@ SEASTAR_TEST_CASE(test_group0_history_clearing_old_entries) {
co_await perform_schema_change();
BOOST_REQUIRE_EQUAL(co_await get_history_size(), size + 1);

- auto& mm = e.migration_manager().local();
- mm.set_group0_history_gc_duration(gc_clock::duration{0});
+ auto& rclient = e.get_raft_group0_client();
+ rclient.set_history_gc_duration(gc_clock::duration{0});

// When group0_history_gc_duration is 0, any change should clear all previous history entries.
co_await perform_schema_change();
BOOST_REQUIRE_EQUAL(co_await get_history_size(), 1);
diff --git a/test/lib/cql_test_env.hh b/test/lib/cql_test_env.hh
--- a/test/lib/cql_test_env.hh
+++ b/test/lib/cql_test_env.hh
@@ -50,6 +50,7 @@ namespace service {

class client_state;
class migration_manager;
+class raft_group0_client;

}

@@ -163,6 +164,8 @@ public:

Commit Bot

<bot@cloudius-systems.com>
unread,
May 19, 2022, 6:29:25 AM5/19/22
to scylladb-dev@googlegroups.com, Gleb Natapov
From: Gleb Natapov <gl...@scylladb.com>
Committer: Avi Kivity <a...@scylladb.com>
Branch: next

service: raft: move group0 write path into a separate file

Writing into the group0 raft group on a client side involves locking
the state machine, choosing a state id and checking for its presence
after operation completes. The code that does it resides now in the
migration manager since the currently it is the only user of group0. In
the near future we will have more client for group0 and they all will
have to have the same logic, so the patch moves it to a separate class
raft_group0_client that any future user of group0 can use to write
into it.

Message-Id: <YoYAJwdT...@scylladb.com>

---
diff --git a/configure.py b/configure.py
--- a/configure.py
+++ b/configure.py
@@ -1020,6 +1020,7 @@ def find_headers(repodir, excluded_dirs):
'service/raft/discovery.cc',
'service/raft/raft_group0.cc',
'direct_failure_detector/failure_detector.cc',
+ 'service/raft/raft_group0_client.cc',
] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] \
+ scylla_raft_core
)
diff --git a/main.cc b/main.cc
diff --git a/service/migration_manager.hh b/service/migration_manager.hh
@@ -95,21 +68,15 @@ private:
service::storage_proxy& _storage_proxy;
gms::gossiper& _gossiper;
seastar::abort_source _as;
- service::raft_group_registry& _raft_gr;
+ service::raft_group0_client& _group0_client;
sharded<db::system_keyspace>& _sys_ks;
serialized_action _schema_push;
utils::UUID _schema_version_to_publish;

- friend class group0_state_machine;
- // See `group0_guard::impl` for explanation of the purpose of these locks.
- semaphore _group0_read_apply_mutex;
- semaphore _group0_operation_mutex;
-
- gc_clock::duration _group0_history_gc_duration;
-
+ friend class group0_state_machine; // needed for access to _messaging
size_t _concurrent_ddl_retries;
public:
- migration_manager(migration_notifier&, gms::feature_service&, netw::messaging_service& ms, service::storage_proxy&, gms::gossiper& gossiper, service::raft_group_registry& raft_gr, sharded<db::system_keyspace>& sysks);
+ migration_manager(migration_notifier&, gms::feature_service&, netw::messaging_service& ms, service::storage_proxy&, gms::gossiper& gossiper, service::raft_group0_client& group0_client, sharded<db::system_keyspace>& sysks);

migration_notifier& get_notifier() { return _notifier; }
const migration_notifier& get_notifier() const { return _notifier; }
@@ -190,7 +157,7 @@ public:
future<group0_guard> start_group0_operation();

// used to check if raft is enabled on the cluster
- bool is_raft_enabled() { return _raft_gr.is_enabled(); }
+ bool is_raft_enabled() { return _group0_client.is_enabled(); }

// Apply a group 0 change.
// The future resolves after the change is applied locally.
@@ -251,14 +218,8 @@ private:
virtual future<> before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_statekey, const gms::versioned_value& newvalue) override { return make_ready_future(); }

public:
- // For tests only.
- void set_group0_history_gc_duration(gc_clock::duration);
-
// For tests only.
void set_concurrent_ddl_retries(size_t);
-
- // For tests only.
- semaphore& group0_operation_mutex();
};

future<column_mapping> get_column_mapping(utils::UUID table_id, table_schema_version v);
diff --git a/service/raft/group0_state_machine.cc b/service/raft/group0_state_machine.cc
--- a/service/raft/group0_state_machine.cc
+++ b/service/raft/group0_state_machine.cc
@@ -26,6 +26,7 @@
#include "service/migration_manager.hh"
#include "db/system_keyspace.hh"
#include "service/storage_proxy.hh"
+#include "service/raft/raft_group0_client.hh"

namespace service {

@@ -57,7 +58,7 @@ future<> group0_state_machine::apply(std::vector<raft::command_cref> command) {
cmd.prev_state_id, cmd.new_state_id, cmd.creator_addr, cmd.creator_id);
slogger.trace("cmd.history_append: {}", cmd.history_append);

- auto read_apply_mutex_holder = co_await get_units(_mm._group0_read_apply_mutex, 1);
+ auto read_apply_mutex_holder = co_await get_units(_client._read_apply_mutex, 1);

if (cmd.prev_state_id) {
auto last_group0_state_id = co_await db::system_keyspace::get_last_group0_state_id();
@@ -128,7 +129,7 @@ future<> group0_state_machine::transfer_snapshot(gms::inet_address from, raft::s

// TODO ensure atomicity of snapshot application in presence of crashes (see TODO in `apply`)

- auto read_apply_mutex_holder = co_await get_units(_mm._group0_read_apply_mutex, 1);
+ auto read_apply_mutex_holder = co_await get_units(_client._read_apply_mutex, 1);

co_await _mm.merge_schema_from(addr, std::move(*cm));

diff --git a/service/raft/group0_state_machine.hh b/service/raft/group0_state_machine.hh
--- a/service/raft/group0_state_machine.hh
+++ b/service/raft/group0_state_machine.hh
@@ -13,6 +13,7 @@
#include "service/raft/raft_state_machine.hh"

namespace service {
+class raft_group0_client;
class migration_manager;
class storage_proxy;

@@ -58,10 +59,11 @@ struct group0_command {
// Raft state machine implementation for managing group 0 changes (e.g. schema changes).
// NOTE: group 0 raft server is always instantiated on shard 0.
class group0_state_machine : public raft_state_machine {
+ raft_group0_client& _client;
migration_manager& _mm;
storage_proxy& _sp;
public:
- group0_state_machine(migration_manager& mm, storage_proxy& sp) : _mm(mm), _sp(sp) {}
+ group0_state_machine(raft_group0_client& client, migration_manager& mm, storage_proxy& sp) : _client(client), _mm(mm), _sp(sp) {}
future<> apply(std::vector<raft::command_cref> command) override;
future<raft::snapshot_id> take_snapshot() override;
void drop_snapshot(raft::snapshot_id id) override;
diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc
--- a/service/raft/raft_group0.cc
+++ b/service/raft/raft_group0.cc
@@ -31,8 +31,9 @@ raft_group0::raft_group0(seastar::abort_source& abort_source,
netw::messaging_service& ms,
gms::gossiper& gs,
cql3::query_processor& qp,
- service::migration_manager& mm)
- : _abort_source(abort_source), _raft_gr(raft_gr), _ms(ms), _gossiper(gs), _qp(qp), _mm(mm)
+ service::migration_manager& mm,
+ raft_group0_client& client)
+ : _abort_source(abort_source), _raft_gr(raft_gr), _ms(ms), _gossiper(gs), _qp(qp), _mm(mm), _client(client)
{
}

@@ -54,7 +55,7 @@ raft_server_for_group raft_group0::create_server_for_group(raft::group_id gid,
raft::server_address my_addr) {

_raft_gr.address_map().set(my_addr);
- auto state_machine = std::make_unique<group0_state_machine>(_mm, _qp.proxy());
+ auto state_machine = std::make_unique<group0_state_machine>(_client, _mm, _qp.proxy());
auto rpc = std::make_unique<raft_rpc>(*state_machine, _ms, _raft_gr.address_map(), gid, my_addr.id,
[this] (gms::inet_address addr, bool added) {
// FIXME: we should eventually switch to UUID-based (not IP-based) node identification/communication scheme.
diff --git a/service/raft/raft_group0.hh b/service/raft/raft_group0.hh
--- a/service/raft/raft_group0.hh
+++ b/service/raft/raft_group0.hh
@@ -17,6 +17,7 @@ namespace gms { class gossiper; }
namespace service {

class migration_manager;
+class raft_group0_client;

// Wrapper for `discovery` which persists the learned peers on disk.
class persistent_discovery {
@@ -65,6 +66,8 @@ public:
gms::gossiper& _gossiper;
cql3::query_processor& _qp;
service::migration_manager& _mm;
+ raft_group0_client& _client;
+
// Status of leader discovery. Initially there is no group 0,
// and the variant contains no state. During initial cluster
// bootstrap a discovery object is created, which is then
@@ -86,7 +89,8 @@ public:
netw::messaging_service& ms,
gms::gossiper& gs,
cql3::query_processor& qp,
- migration_manager& mm);
+ migration_manager& mm,
+ raft_group0_client& client);

future<> abort() {
if (!_abort_source.abort_requested()) {
diff --git a/service/raft/raft_group0_client.cc b/service/raft/raft_group0_client.cc
--- a/service/raft/raft_group0_client.cc
diff --git a/service/raft/raft_group0_client.hh b/service/raft/raft_group0_client.hh
--- a/service/raft/raft_group0_client.hh
diff --git a/service/storage_service.cc b/service/storage_service.cc
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -1304,14 +1304,14 @@ future<> storage_service::uninit_messaging_service_part() {
return container().invoke_on_all(&service::storage_service::uninit_messaging_service);
}

-future<> storage_service::init_server(cql3::query_processor& qp) {
+future<> storage_service::init_server(cql3::query_processor& qp, raft_group0_client& client) {
assert(this_shard_id() == 0);

- return seastar::async([this, &qp] {
+ return seastar::async([this, &qp, &client] {
set_mode(mode::STARTING);

_group0 = std::make_unique<raft_group0>(_abort_source, _raft_gr, _messaging.local(),
- _gossiper, qp, _migration_manager.local());
+ _gossiper, qp, _migration_manager.local(), client);

std::unordered_set<inet_address> loaded_endpoints;
if (_db.local().get_config().load_ring_state()) {
diff --git a/service/storage_service.hh b/service/storage_service.hh
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -88,6 +88,7 @@ namespace service {
class storage_service;
class migration_manager;
class raft_group0;
+class raft_group0_client;

enum class disk_error { regular, commit };

@@ -371,7 +372,7 @@ public:
*
* \see init_messaging_service_part
*/
- future<> init_server(cql3::query_processor& qp);
+ future<> init_server(cql3::query_processor& qp, raft_group0_client& client);

future<> join_cluster();

diff --git a/test/boost/group0_test.cc b/test/boost/group0_test.cc
--- a/test/boost/group0_test.cc
+++ b/test/boost/group0_test.cc
@@ -46,16 +46,16 @@ SEASTAR_TEST_CASE(test_group0_history_clearing_old_entries) {
co_await perform_schema_change();
BOOST_REQUIRE_EQUAL(co_await get_history_size(), size + 1);

- auto& mm = e.migration_manager().local();
- mm.set_group0_history_gc_duration(gc_clock::duration{0});
+ auto& rclient = e.get_raft_group0_client();
+ rclient.set_history_gc_duration(gc_clock::duration{0});

// When group0_history_gc_duration is 0, any change should clear all previous history entries.
co_await perform_schema_change();
BOOST_REQUIRE_EQUAL(co_await get_history_size(), 1);
diff --git a/test/lib/cql_test_env.hh b/test/lib/cql_test_env.hh
--- a/test/lib/cql_test_env.hh
+++ b/test/lib/cql_test_env.hh
@@ -50,6 +50,7 @@ namespace service {

class client_state;
class migration_manager;
+class raft_group0_client;

}

@@ -163,6 +164,8 @@ public:

Commit Bot

<bot@cloudius-systems.com>
unread,
May 19, 2022, 10:21:52 AM5/19/22
to scylladb-dev@googlegroups.com, Gleb Natapov
From: Gleb Natapov <gl...@scylladb.com>
Committer: Avi Kivity <a...@scylladb.com>
Branch: next

service: raft: move group0 write path into a separate file

Writing into the group0 raft group on a client side involves locking
the state machine, choosing a state id and checking for its presence
after operation completes. The code that does it resides now in the
migration manager since the currently it is the only user of group0. In
the near future we will have more client for group0 and they all will
have to have the same logic, so the patch moves it to a separate class
raft_group0_client that any future user of group0 can use to write
into it.

Message-Id: <YoYAJwdT...@scylladb.com>

---
diff --git a/configure.py b/configure.py
--- a/configure.py
+++ b/configure.py
@@ -1020,6 +1020,7 @@ def find_headers(repodir, excluded_dirs):
'service/raft/discovery.cc',
'service/raft/raft_group0.cc',
'direct_failure_detector/failure_detector.cc',
+ 'service/raft/raft_group0_client.cc',
] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] \
+ scylla_raft_core
)
diff --git a/main.cc b/main.cc
diff --git a/service/migration_manager.hh b/service/migration_manager.hh
@@ -95,21 +68,15 @@ private:
service::storage_proxy& _storage_proxy;
gms::gossiper& _gossiper;
seastar::abort_source _as;
- service::raft_group_registry& _raft_gr;
+ service::raft_group0_client& _group0_client;
sharded<db::system_keyspace>& _sys_ks;
serialized_action _schema_push;
utils::UUID _schema_version_to_publish;

- friend class group0_state_machine;
- // See `group0_guard::impl` for explanation of the purpose of these locks.
- semaphore _group0_read_apply_mutex;
- semaphore _group0_operation_mutex;
-
- gc_clock::duration _group0_history_gc_duration;
-
+ friend class group0_state_machine; // needed for access to _messaging
size_t _concurrent_ddl_retries;
public:
- migration_manager(migration_notifier&, gms::feature_service&, netw::messaging_service& ms, service::storage_proxy&, gms::gossiper& gossiper, service::raft_group_registry& raft_gr, sharded<db::system_keyspace>& sysks);
+ migration_manager(migration_notifier&, gms::feature_service&, netw::messaging_service& ms, service::storage_proxy&, gms::gossiper& gossiper, service::raft_group0_client& group0_client, sharded<db::system_keyspace>& sysks);

migration_notifier& get_notifier() { return _notifier; }
const migration_notifier& get_notifier() const { return _notifier; }
@@ -190,7 +157,7 @@ public:
future<group0_guard> start_group0_operation();

// used to check if raft is enabled on the cluster
- bool is_raft_enabled() { return _raft_gr.is_enabled(); }
+ bool is_raft_enabled() { return _group0_client.is_enabled(); }

// Apply a group 0 change.
// The future resolves after the change is applied locally.
@@ -251,14 +218,8 @@ private:
virtual future<> before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_statekey, const gms::versioned_value& newvalue) override { return make_ready_future(); }

public:
- // For tests only.
- void set_group0_history_gc_duration(gc_clock::duration);
-
// For tests only.
void set_concurrent_ddl_retries(size_t);
-
- // For tests only.
- semaphore& group0_operation_mutex();
};

future<column_mapping> get_column_mapping(utils::UUID table_id, table_schema_version v);
diff --git a/service/raft/group0_state_machine.cc b/service/raft/group0_state_machine.cc
--- a/service/raft/group0_state_machine.cc
+++ b/service/raft/group0_state_machine.cc
@@ -26,6 +26,7 @@
#include "service/migration_manager.hh"
#include "db/system_keyspace.hh"
#include "service/storage_proxy.hh"
+#include "service/raft/raft_group0_client.hh"

namespace service {

@@ -57,7 +58,7 @@ future<> group0_state_machine::apply(std::vector<raft::command_cref> command) {
cmd.prev_state_id, cmd.new_state_id, cmd.creator_addr, cmd.creator_id);
slogger.trace("cmd.history_append: {}", cmd.history_append);

- auto read_apply_mutex_holder = co_await get_units(_mm._group0_read_apply_mutex, 1);
+ auto read_apply_mutex_holder = co_await get_units(_client._read_apply_mutex, 1);

if (cmd.prev_state_id) {
auto last_group0_state_id = co_await db::system_keyspace::get_last_group0_state_id();
@@ -128,7 +129,7 @@ future<> group0_state_machine::transfer_snapshot(gms::inet_address from, raft::s

// TODO ensure atomicity of snapshot application in presence of crashes (see TODO in `apply`)

- auto read_apply_mutex_holder = co_await get_units(_mm._group0_read_apply_mutex, 1);
+ auto read_apply_mutex_holder = co_await get_units(_client._read_apply_mutex, 1);

co_await _mm.merge_schema_from(addr, std::move(*cm));

diff --git a/service/raft/group0_state_machine.hh b/service/raft/group0_state_machine.hh
--- a/service/raft/group0_state_machine.hh
+++ b/service/raft/group0_state_machine.hh
@@ -13,6 +13,7 @@
#include "service/raft/raft_state_machine.hh"

namespace service {
+class raft_group0_client;
class migration_manager;
class storage_proxy;

@@ -58,10 +59,11 @@ struct group0_command {
// Raft state machine implementation for managing group 0 changes (e.g. schema changes).
// NOTE: group 0 raft server is always instantiated on shard 0.
class group0_state_machine : public raft_state_machine {
+ raft_group0_client& _client;
migration_manager& _mm;
storage_proxy& _sp;
public:
- group0_state_machine(migration_manager& mm, storage_proxy& sp) : _mm(mm), _sp(sp) {}
+ group0_state_machine(raft_group0_client& client, migration_manager& mm, storage_proxy& sp) : _client(client), _mm(mm), _sp(sp) {}
future<> apply(std::vector<raft::command_cref> command) override;
future<raft::snapshot_id> take_snapshot() override;
void drop_snapshot(raft::snapshot_id id) override;
diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc
--- a/service/raft/raft_group0.cc
+++ b/service/raft/raft_group0.cc
@@ -31,8 +31,9 @@ raft_group0::raft_group0(seastar::abort_source& abort_source,
netw::messaging_service& ms,
gms::gossiper& gs,
cql3::query_processor& qp,
- service::migration_manager& mm)
- : _abort_source(abort_source), _raft_gr(raft_gr), _ms(ms), _gossiper(gs), _qp(qp), _mm(mm)
+ service::migration_manager& mm,
+ raft_group0_client& client)
+ : _abort_source(abort_source), _raft_gr(raft_gr), _ms(ms), _gossiper(gs), _qp(qp), _mm(mm), _client(client)
{
}

@@ -54,7 +55,7 @@ raft_server_for_group raft_group0::create_server_for_group(raft::group_id gid,
raft::server_address my_addr) {

_raft_gr.address_map().set(my_addr);
- auto state_machine = std::make_unique<group0_state_machine>(_mm, _qp.proxy());
+ auto state_machine = std::make_unique<group0_state_machine>(_client, _mm, _qp.proxy());
auto rpc = std::make_unique<raft_rpc>(*state_machine, _ms, _raft_gr.address_map(), gid, my_addr.id,
[this] (gms::inet_address addr, bool added) {
// FIXME: we should eventually switch to UUID-based (not IP-based) node identification/communication scheme.
diff --git a/service/raft/raft_group0.hh b/service/raft/raft_group0.hh
--- a/service/raft/raft_group0.hh
+++ b/service/raft/raft_group0.hh
@@ -17,6 +17,7 @@ namespace gms { class gossiper; }
namespace service {

class migration_manager;
+class raft_group0_client;

// Wrapper for `discovery` which persists the learned peers on disk.
class persistent_discovery {
@@ -65,6 +66,8 @@ public:
gms::gossiper& _gossiper;
cql3::query_processor& _qp;
service::migration_manager& _mm;
+ raft_group0_client& _client;
+
// Status of leader discovery. Initially there is no group 0,
// and the variant contains no state. During initial cluster
// bootstrap a discovery object is created, which is then
@@ -86,7 +89,8 @@ public:
netw::messaging_service& ms,
gms::gossiper& gs,
cql3::query_processor& qp,
- migration_manager& mm);
+ migration_manager& mm,
+ raft_group0_client& client);

future<> abort() {
if (!_abort_source.abort_requested()) {
diff --git a/service/raft/raft_group0_client.cc b/service/raft/raft_group0_client.cc
--- a/service/raft/raft_group0_client.cc
diff --git a/service/raft/raft_group0_client.hh b/service/raft/raft_group0_client.hh
--- a/service/raft/raft_group0_client.hh
diff --git a/service/storage_service.cc b/service/storage_service.cc
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -1304,14 +1304,14 @@ future<> storage_service::uninit_messaging_service_part() {
return container().invoke_on_all(&service::storage_service::uninit_messaging_service);
}

-future<> storage_service::init_server(cql3::query_processor& qp) {
+future<> storage_service::init_server(cql3::query_processor& qp, raft_group0_client& client) {
assert(this_shard_id() == 0);

- return seastar::async([this, &qp] {
+ return seastar::async([this, &qp, &client] {
set_mode(mode::STARTING);

_group0 = std::make_unique<raft_group0>(_abort_source, _raft_gr, _messaging.local(),
- _gossiper, qp, _migration_manager.local());
+ _gossiper, qp, _migration_manager.local(), client);

std::unordered_set<inet_address> loaded_endpoints;
if (_db.local().get_config().load_ring_state()) {
diff --git a/service/storage_service.hh b/service/storage_service.hh
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -88,6 +88,7 @@ namespace service {
class storage_service;
class migration_manager;
class raft_group0;
+class raft_group0_client;

enum class disk_error { regular, commit };

@@ -371,7 +372,7 @@ public:
*
* \see init_messaging_service_part
*/
- future<> init_server(cql3::query_processor& qp);
+ future<> init_server(cql3::query_processor& qp, raft_group0_client& client);

future<> join_cluster();

diff --git a/test/boost/group0_test.cc b/test/boost/group0_test.cc
--- a/test/boost/group0_test.cc
+++ b/test/boost/group0_test.cc
@@ -46,16 +46,16 @@ SEASTAR_TEST_CASE(test_group0_history_clearing_old_entries) {
co_await perform_schema_change();
BOOST_REQUIRE_EQUAL(co_await get_history_size(), size + 1);

- auto& mm = e.migration_manager().local();
- mm.set_group0_history_gc_duration(gc_clock::duration{0});
+ auto& rclient = e.get_raft_group0_client();
+ rclient.set_history_gc_duration(gc_clock::duration{0});

// When group0_history_gc_duration is 0, any change should clear all previous history entries.
co_await perform_schema_change();
BOOST_REQUIRE_EQUAL(co_await get_history_size(), 1);
diff --git a/test/lib/cql_test_env.hh b/test/lib/cql_test_env.hh
--- a/test/lib/cql_test_env.hh
+++ b/test/lib/cql_test_env.hh
@@ -50,6 +50,7 @@ namespace service {

class client_state;
class migration_manager;
+class raft_group0_client;

}

@@ -163,6 +164,8 @@ public:

Commit Bot

<bot@cloudius-systems.com>
unread,
May 19, 2022, 2:17:44 PM5/19/22
to scylladb-dev@googlegroups.com, Gleb Natapov
From: Gleb Natapov <gl...@scylladb.com>
Committer: Avi Kivity <a...@scylladb.com>
Branch: master

service: raft: move group0 write path into a separate file

Writing into the group0 raft group on a client side involves locking
the state machine, choosing a state id and checking for its presence
after operation completes. The code that does it resides now in the
migration manager since the currently it is the only user of group0. In
the near future we will have more client for group0 and they all will
have to have the same logic, so the patch moves it to a separate class
raft_group0_client that any future user of group0 can use to write
into it.

Message-Id: <YoYAJwdT...@scylladb.com>

---
diff --git a/configure.py b/configure.py
--- a/configure.py
+++ b/configure.py
@@ -1020,6 +1020,7 @@ def find_headers(repodir, excluded_dirs):
'service/raft/discovery.cc',
'service/raft/raft_group0.cc',
'direct_failure_detector/failure_detector.cc',
+ 'service/raft/raft_group0_client.cc',
] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] \
+ scylla_raft_core
)
diff --git a/main.cc b/main.cc
diff --git a/service/migration_manager.hh b/service/migration_manager.hh
@@ -95,21 +68,15 @@ private:
service::storage_proxy& _storage_proxy;
gms::gossiper& _gossiper;
seastar::abort_source _as;
- service::raft_group_registry& _raft_gr;
+ service::raft_group0_client& _group0_client;
sharded<db::system_keyspace>& _sys_ks;
serialized_action _schema_push;
utils::UUID _schema_version_to_publish;

- friend class group0_state_machine;
- // See `group0_guard::impl` for explanation of the purpose of these locks.
- semaphore _group0_read_apply_mutex;
- semaphore _group0_operation_mutex;
-
- gc_clock::duration _group0_history_gc_duration;
-
+ friend class group0_state_machine; // needed for access to _messaging
size_t _concurrent_ddl_retries;
public:
- migration_manager(migration_notifier&, gms::feature_service&, netw::messaging_service& ms, service::storage_proxy&, gms::gossiper& gossiper, service::raft_group_registry& raft_gr, sharded<db::system_keyspace>& sysks);
+ migration_manager(migration_notifier&, gms::feature_service&, netw::messaging_service& ms, service::storage_proxy&, gms::gossiper& gossiper, service::raft_group0_client& group0_client, sharded<db::system_keyspace>& sysks);

migration_notifier& get_notifier() { return _notifier; }
const migration_notifier& get_notifier() const { return _notifier; }
@@ -190,7 +157,7 @@ public:
future<group0_guard> start_group0_operation();

// used to check if raft is enabled on the cluster
- bool is_raft_enabled() { return _raft_gr.is_enabled(); }
+ bool is_raft_enabled() { return _group0_client.is_enabled(); }

// Apply a group 0 change.
// The future resolves after the change is applied locally.
@@ -251,14 +218,8 @@ private:
virtual future<> before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_statekey, const gms::versioned_value& newvalue) override { return make_ready_future(); }

public:
- // For tests only.
- void set_group0_history_gc_duration(gc_clock::duration);
-
// For tests only.
void set_concurrent_ddl_retries(size_t);
-
- // For tests only.
- semaphore& group0_operation_mutex();
};

future<column_mapping> get_column_mapping(utils::UUID table_id, table_schema_version v);
diff --git a/service/raft/group0_state_machine.cc b/service/raft/group0_state_machine.cc
--- a/service/raft/group0_state_machine.cc
+++ b/service/raft/group0_state_machine.cc
@@ -26,6 +26,7 @@
#include "service/migration_manager.hh"
#include "db/system_keyspace.hh"
#include "service/storage_proxy.hh"
+#include "service/raft/raft_group0_client.hh"

namespace service {

@@ -57,7 +58,7 @@ future<> group0_state_machine::apply(std::vector<raft::command_cref> command) {
cmd.prev_state_id, cmd.new_state_id, cmd.creator_addr, cmd.creator_id);
slogger.trace("cmd.history_append: {}", cmd.history_append);

- auto read_apply_mutex_holder = co_await get_units(_mm._group0_read_apply_mutex, 1);
+ auto read_apply_mutex_holder = co_await get_units(_client._read_apply_mutex, 1);

if (cmd.prev_state_id) {
auto last_group0_state_id = co_await db::system_keyspace::get_last_group0_state_id();
@@ -128,7 +129,7 @@ future<> group0_state_machine::transfer_snapshot(gms::inet_address from, raft::s

// TODO ensure atomicity of snapshot application in presence of crashes (see TODO in `apply`)

- auto read_apply_mutex_holder = co_await get_units(_mm._group0_read_apply_mutex, 1);
+ auto read_apply_mutex_holder = co_await get_units(_client._read_apply_mutex, 1);

co_await _mm.merge_schema_from(addr, std::move(*cm));

diff --git a/service/raft/group0_state_machine.hh b/service/raft/group0_state_machine.hh
--- a/service/raft/group0_state_machine.hh
+++ b/service/raft/group0_state_machine.hh
@@ -13,6 +13,7 @@
#include "service/raft/raft_state_machine.hh"

namespace service {
+class raft_group0_client;
class migration_manager;
class storage_proxy;

@@ -58,10 +59,11 @@ struct group0_command {
// Raft state machine implementation for managing group 0 changes (e.g. schema changes).
// NOTE: group 0 raft server is always instantiated on shard 0.
class group0_state_machine : public raft_state_machine {
+ raft_group0_client& _client;
migration_manager& _mm;
storage_proxy& _sp;
public:
- group0_state_machine(migration_manager& mm, storage_proxy& sp) : _mm(mm), _sp(sp) {}
+ group0_state_machine(raft_group0_client& client, migration_manager& mm, storage_proxy& sp) : _client(client), _mm(mm), _sp(sp) {}
future<> apply(std::vector<raft::command_cref> command) override;
future<raft::snapshot_id> take_snapshot() override;
void drop_snapshot(raft::snapshot_id id) override;
diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc
--- a/service/raft/raft_group0.cc
+++ b/service/raft/raft_group0.cc
@@ -31,8 +31,9 @@ raft_group0::raft_group0(seastar::abort_source& abort_source,
netw::messaging_service& ms,
gms::gossiper& gs,
cql3::query_processor& qp,
- service::migration_manager& mm)
- : _abort_source(abort_source), _raft_gr(raft_gr), _ms(ms), _gossiper(gs), _qp(qp), _mm(mm)
+ service::migration_manager& mm,
+ raft_group0_client& client)
+ : _abort_source(abort_source), _raft_gr(raft_gr), _ms(ms), _gossiper(gs), _qp(qp), _mm(mm), _client(client)
{
}

@@ -54,7 +55,7 @@ raft_server_for_group raft_group0::create_server_for_group(raft::group_id gid,
raft::server_address my_addr) {

_raft_gr.address_map().set(my_addr);
- auto state_machine = std::make_unique<group0_state_machine>(_mm, _qp.proxy());
+ auto state_machine = std::make_unique<group0_state_machine>(_client, _mm, _qp.proxy());
auto rpc = std::make_unique<raft_rpc>(*state_machine, _ms, _raft_gr.address_map(), gid, my_addr.id,
[this] (gms::inet_address addr, bool added) {
// FIXME: we should eventually switch to UUID-based (not IP-based) node identification/communication scheme.
diff --git a/service/raft/raft_group0.hh b/service/raft/raft_group0.hh
--- a/service/raft/raft_group0.hh
+++ b/service/raft/raft_group0.hh
@@ -17,6 +17,7 @@ namespace gms { class gossiper; }
namespace service {

class migration_manager;
+class raft_group0_client;

// Wrapper for `discovery` which persists the learned peers on disk.
class persistent_discovery {
@@ -65,6 +66,8 @@ public:
gms::gossiper& _gossiper;
cql3::query_processor& _qp;
service::migration_manager& _mm;
+ raft_group0_client& _client;
+
// Status of leader discovery. Initially there is no group 0,
// and the variant contains no state. During initial cluster
// bootstrap a discovery object is created, which is then
@@ -86,7 +89,8 @@ public:
netw::messaging_service& ms,
gms::gossiper& gs,
cql3::query_processor& qp,
- migration_manager& mm);
+ migration_manager& mm,
+ raft_group0_client& client);

future<> abort() {
if (!_abort_source.abort_requested()) {
diff --git a/service/raft/raft_group0_client.cc b/service/raft/raft_group0_client.cc
--- a/service/raft/raft_group0_client.cc
diff --git a/service/raft/raft_group0_client.hh b/service/raft/raft_group0_client.hh
--- a/service/raft/raft_group0_client.hh
diff --git a/service/storage_service.cc b/service/storage_service.cc
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -1304,14 +1304,14 @@ future<> storage_service::uninit_messaging_service_part() {
return container().invoke_on_all(&service::storage_service::uninit_messaging_service);
}

-future<> storage_service::init_server(cql3::query_processor& qp) {
+future<> storage_service::init_server(cql3::query_processor& qp, raft_group0_client& client) {
assert(this_shard_id() == 0);

- return seastar::async([this, &qp] {
+ return seastar::async([this, &qp, &client] {
set_mode(mode::STARTING);

_group0 = std::make_unique<raft_group0>(_abort_source, _raft_gr, _messaging.local(),
- _gossiper, qp, _migration_manager.local());
+ _gossiper, qp, _migration_manager.local(), client);

std::unordered_set<inet_address> loaded_endpoints;
if (_db.local().get_config().load_ring_state()) {
diff --git a/service/storage_service.hh b/service/storage_service.hh
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -88,6 +88,7 @@ namespace service {
class storage_service;
class migration_manager;
class raft_group0;
+class raft_group0_client;

enum class disk_error { regular, commit };

@@ -371,7 +372,7 @@ public:
*
* \see init_messaging_service_part
*/
- future<> init_server(cql3::query_processor& qp);
+ future<> init_server(cql3::query_processor& qp, raft_group0_client& client);

future<> join_cluster();

diff --git a/test/boost/group0_test.cc b/test/boost/group0_test.cc
--- a/test/boost/group0_test.cc
+++ b/test/boost/group0_test.cc
@@ -46,16 +46,16 @@ SEASTAR_TEST_CASE(test_group0_history_clearing_old_entries) {
co_await perform_schema_change();
BOOST_REQUIRE_EQUAL(co_await get_history_size(), size + 1);

- auto& mm = e.migration_manager().local();
- mm.set_group0_history_gc_duration(gc_clock::duration{0});
+ auto& rclient = e.get_raft_group0_client();
+ rclient.set_history_gc_duration(gc_clock::duration{0});

// When group0_history_gc_duration is 0, any change should clear all previous history entries.
co_await perform_schema_change();
BOOST_REQUIRE_EQUAL(co_await get_history_size(), 1);
diff --git a/test/lib/cql_test_env.hh b/test/lib/cql_test_env.hh
--- a/test/lib/cql_test_env.hh
+++ b/test/lib/cql_test_env.hh
@@ -50,6 +50,7 @@ namespace service {

class client_state;
class migration_manager;
+class raft_group0_client;

}

@@ -163,6 +164,8 @@ public:

Kamil Braun

<kbraun@scylladb.com>
unread,
May 24, 2022, 10:46:09 AM5/24/22
to Gleb Natapov, scylladb-dev
I know it was merged already, but leaving comments for potential future cleanups.

unused variables
Isn't the scope of `add_entry` limiting enough?
Or do you think it's important to release the guard before calling `system_keyspace::group0_history_contains(...)`? I don't think it is.

Even if it was, you wouldn't need a lambda, it would be enough to open a scope and move the guard into it:
{
    auto g = std::move(guard);
    raft::command cmd;
    ...
}

if (!(co_await db::system_keyspace::...)) {
    ...
The comment does not make sense in this context (previously, it was positioned above the retry loop)
--
You received this message because you are subscribed to the Google Groups "ScyllaDB development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scylladb-dev...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/scylladb-dev/YoYAJwdTdbX%2BiCUn%40scylladb.com.

Gleb Natapov

<gleb@scylladb.com>
unread,
May 25, 2022, 2:33:28 AM5/25/22
to Kamil Braun, scylladb-dev
I wanted to preserve the same release point where it was before the
patch. And yes, it does not make sense to hold the lock while lookup. We
can admit another operation instead.

> Even if it was, you wouldn't need a lambda, it would be enough to open a
> scope and move the guard into it:
> {
> auto g = std::move(guard);
> raft::command cmd;
> ...
> }
I am sure a compiler will generate very similar code.

>
> if (!(co_await db::system_keyspace::...)) {
> ...
> }
>
>

> > +
> > +group0_command raft_group0_client::prepare_command(schema_change change,
> > group0_guard& guard, std::string_view description) {
> > + group0_command group0_cmd {
> > + .change{std::move(change)},
> > +
> > .history_append{db::system_keyspace::make_group0_history_state_id_mutation(
> > + guard.new_group0_state_id(), _history_gc_duration,
> > description)},
> > +
> > + // IMPORTANT: the retry mechanism below assumes that
> > `prev_state_id` is engaged (not nullopt).
> >
> The comment does not make sense in this context (previously, it was
> positioned above the retry loop)
>
"Below" can be dropped, but otherwise the retry look is still there and
its assumption is still the same.

> > + // Here it is: the return type of
> > `guard.observerd_group0_state_id()` is `utils::UUID`.
> > + .prev_state_id{guard.observed_group0_state_id()},
> > + .new_state_id{guard.new_group0_state_id()},
> > +
> > + .creator_addr{utils::fb_utilities::get_broadcast_address()},
> > + .creator_id{_raft_gr.group0().id()}
> > + };
> > +
> > + return group0_cmd;
> > +}
> > +
> > +}

--
Gleb.

Kamil Braun

<kbraun@scylladb.com>
unread,
May 25, 2022, 4:55:13 AM5/25/22
to Gleb Natapov, scylladb-dev
I wasn't thinking about the compiler, I was thinking about the human, but nevermind.
Reply all
Reply to author
Forward
0 new messages