[PATCH v1 0/7] raft: testing: many nodes test

1 view
Skip to first unread message

Alejo Sanchez

unread,
Jul 16, 2021, 11:58:49 AMJul 16
to scylla...@googlegroups.com, kos...@scylladb.com, gl...@scylladb.com, tgra...@scylladb.com, Alejo Sanchez
Factor out replication test, make it work with different clocks, add
some features, and add a many nodes test with steady_clock. Also
refactor common test helper.

Many nodes test passes with 600 nodes but with higher numbers it needs
prevote follower backoff and non-uniform candidate timeouts.

Branch URL: https://github.com/alecco/scylla/tree/raft-many-05

Tests: unit ({dev}), unit ({debug}), unit ({release})

Alejo Sanchez (7):
raft: testing: refactor helper
raft: replication test: move common code out
raft: replication test: tick delta inside raft_cluster
raft: replication test: template clock type
raft: replication test: disconnect one or disconnect pair
raft: replication test: partition ranges
raft: testing: many nodes test

configure.py | 8 +-
test/raft/etcd_test.cc | 2 +
test/raft/fsm_test.cc | 2 +
test/raft/{helpers.hh => helpers.cc} | 107 +-
test/raft/helpers.hh | 145 +-
test/raft/many_test.cc | 48 +
.../{replication_test.cc => replication.hh} | 865 +++---------
test/raft/replication_test.cc | 1239 +----------------
8 files changed, 313 insertions(+), 2103 deletions(-)
copy test/raft/{helpers.hh => helpers.cc} (63%)
create mode 100644 test/raft/many_test.cc
copy test/raft/{replication_test.cc => replication.hh} (56%)

--
2.31.1

Alejo Sanchez

unread,
Jul 16, 2021, 11:58:51 AMJul 16
to scylla...@googlegroups.com, kos...@scylladb.com, gl...@scylladb.com, tgra...@scylladb.com, Alejo Sanchez
Move definitions to helper object file.

Signed-off-by: Alejo Sanchez <alejo....@scylladb.com>
---
configure.py | 6 +-
test/raft/etcd_test.cc | 2 +
test/raft/fsm_test.cc | 2 +
test/raft/{helpers.hh => helpers.cc} | 107 ++------------------
test/raft/helpers.hh | 145 +++++----------------------
5 files changed, 39 insertions(+), 223 deletions(-)
copy test/raft/{helpers.hh => helpers.cc} (63%)

diff --git a/configure.py b/configure.py
index aa973a3e60..4bd29434d1 100755
--- a/configure.py
+++ b/configure.py
@@ -1253,10 +1253,10 @@ deps['test/boost/linearizing_input_stream_test'] = [
deps['test/boost/duration_test'] += ['test/lib/exception_utils.cc']
deps['test/boost/alternator_unit_test'] += ['alternator/base64.cc']

-deps['test/raft/replication_test'] = ['test/raft/replication_test.cc'] + scylla_minimal_raft_dependencies
+deps['test/raft/replication_test'] = ['test/raft/replication_test.cc', 'test/raft/helpers.cc'] + scylla_minimal_raft_dependencies
deps['test/raft/randomized_nemesis_test'] = ['test/raft/randomized_nemesis_test.cc'] + scylla_minimal_raft_dependencies
-deps['test/raft/fsm_test'] = ['test/raft/fsm_test.cc', 'test/lib/log.cc'] + scylla_minimal_raft_dependencies
-deps['test/raft/etcd_test'] = ['test/raft/etcd_test.cc', 'test/lib/log.cc'] + scylla_minimal_raft_dependencies
+deps['test/raft/fsm_test'] = ['test/raft/fsm_test.cc', 'test/raft/helpers.cc', 'test/lib/log.cc'] + scylla_minimal_raft_dependencies
+deps['test/raft/etcd_test'] = ['test/raft/etcd_test.cc', 'test/raft/helpers.cc', 'test/lib/log.cc'] + scylla_minimal_raft_dependencies
deps['test/raft/raft_sys_table_storage_test'] = ['test/raft/raft_sys_table_storage_test.cc'] + \
scylla_raft_dependencies + scylla_tests_generic_dependencies
deps['test/raft/raft_address_map_test'] = ['test/raft/raft_address_map_test.cc'] + scylla_raft_dependencies
diff --git a/test/raft/etcd_test.cc b/test/raft/etcd_test.cc
index 872ba8ccf0..c051eb595f 100644
--- a/test/raft/etcd_test.cc
+++ b/test/raft/etcd_test.cc
@@ -35,6 +35,8 @@

// Port of etcd Raft implementation unit tests

+#define BOOST_TEST_MODULE raft
+
#include "test/raft/helpers.hh"

using namespace raft;
diff --git a/test/raft/fsm_test.cc b/test/raft/fsm_test.cc
index 299eb0a0d4..d9a8f4e1ae 100644
--- a/test/raft/fsm_test.cc
+++ b/test/raft/fsm_test.cc
@@ -19,6 +19,8 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/

+#define BOOST_TEST_MODULE raft
+
#include "test/raft/helpers.hh"

using namespace raft;
diff --git a/test/raft/helpers.hh b/test/raft/helpers.cc
similarity index 63%
copy from test/raft/helpers.hh
copy to test/raft/helpers.cc
index cc4d3f5e90..ae80bf5e7e 100644
--- a/test/raft/helpers.hh
+++ b/test/raft/helpers.cc
@@ -23,19 +23,12 @@
// Helper functions for raft tests
//

-#pragma once
+#include "helpers.hh"

-#define BOOST_TEST_MODULE raft
-
-#include <boost/test/unit_test.hpp>
-#include "test/lib/log.hh"
-#include "test/lib/random_utils.hh"
-#include "serializer_impl.hh"
-#include <limits>
-
-#include "raft/fsm.hh"
+raft::fsm_config fsm_cfg{.append_request_threshold = 1, .enable_prevoting = false};
+raft::fsm_config fsm_cfg_pre{.append_request_threshold = 1, .enable_prevoting = true};

-using seastar::make_lw_shared;
+struct trivial_failure_detector trivial_failure_detector;

void election_threshold(raft::fsm& fsm) {
// Election threshold should be strictly less than
@@ -60,64 +53,10 @@ void make_candidate(raft::fsm& fsm) {
}
}

-struct trivial_failure_detector: public raft::failure_detector {
- bool is_alive(raft::server_id from) override {
- return true;
- }
-} trivial_failure_detector;
-
-class discrete_failure_detector: public raft::failure_detector {
- bool _is_alive = true;
- std::unordered_set<raft::server_id> _dead;
-public:
- bool is_alive(raft::server_id id) override {
- return _is_alive && !_dead.contains(id);
- }
- void mark_dead(raft::server_id id) { _dead.emplace(id); }
- void mark_alive(raft::server_id id) { _dead.erase(id); }
- void mark_all_dead() { _is_alive = false; }
- void mark_all_alive() { _is_alive = true; }
-};
-
-template <typename T> void add_entry(raft::log& log, T cmd) {
- log.emplace_back(make_lw_shared<raft::log_entry>(raft::log_entry{log.last_term(), log.next_idx(), cmd}));
-}
-
raft::snapshot log_snapshot(raft::log& log, raft::index_t idx) {
return raft::snapshot{.idx = idx, .term = log.last_term(), .config = log.get_snapshot().config};
}

-template <typename T>
-raft::command create_command(T val) {
- raft::command command;
- ser::serialize(command, val);
-
- return command;
-}
-
-raft::fsm_config fsm_cfg{.append_request_threshold = 1, .enable_prevoting = false};
-raft::fsm_config fsm_cfg_pre{.append_request_threshold = 1, .enable_prevoting = true};
-
-class fsm_debug : public raft::fsm {
-public:
- using raft::fsm::fsm;
- void become_follower(raft::server_id leader) {
- raft::fsm::become_follower(leader);
- }
- const raft::follower_progress& get_progress(raft::server_id id) {
- raft::follower_progress* progress = leader_state().tracker.find(id);
- return *progress;
- }
- raft::log& get_log() {
- return raft::fsm::get_log();
- }
-
- bool leadership_transfer_active() const {
- assert(is_leader());
- return bool(leader_state().stepdown);
- }
-};
-
// NOTE: it doesn't compare data contents, just the data type
bool compare_log_entry(raft::log_entry_ptr le1, raft::log_entry_ptr le2) {
if (le1->term != le2->term || le1->idx != le2->idx || le1->data.index() != le2->data.index()) {
@@ -139,7 +78,8 @@ bool compare_log_entries(raft::log& log1, raft::log& log2, size_t from, size_t t

using raft_routing_map = std::unordered_map<raft::server_id, raft::fsm*>;

-bool deliver(raft_routing_map& routes, raft::server_id from, std::pair<raft::server_id, raft::rpc_message> m) {
+bool deliver(raft_routing_map& routes, raft::server_id from,
+ std::pair<raft::server_id, raft::rpc_message> m) {
auto it = routes.find(m.first);
if (it == routes.end()) {
// Destination not available
@@ -181,37 +121,6 @@ communicate_impl(std::function<bool()> stop_pred, raft_routing_map& map) {
} while (has_traffic);
}

-template <typename... Args>
-void communicate_until(std::function<bool()> stop_pred, Args&&... args) {
- raft_routing_map map;
- auto add_map_entry = [&map](raft::fsm& fsm) -> void {
- map.emplace(fsm.id(), &fsm);
- };
- (add_map_entry(args), ...);
- communicate_impl(stop_pred, map);
-}
-
-template <typename... Args>
-void communicate(Args&&... args) {
- return communicate_until([]() { return false; }, std::forward<Args>(args)...);
-}
-
-template <typename... Args>
-raft::fsm* select_leader(Args&&... args) {
- raft::fsm* leader = nullptr;
- auto assign_leader = [&leader](raft::fsm& fsm) {
- if (fsm.is_leader()) {
- leader = &fsm;
- return false;
- }
- return true;
- };
- (assign_leader(args) && ...);
- BOOST_CHECK(leader);
- return leader;
-}
-
-
raft::server_id id() {
static int id = 0;
return raft::server_id{utils::UUID(0, ++id)};
@@ -225,7 +134,7 @@ raft::server_address_set address_set(std::vector<raft::server_id> ids) {
return set;
}

-fsm_debug create_follower(raft::server_id id, raft::log log, raft::failure_detector& fd = trivial_failure_detector) {
+fsm_debug create_follower(raft::server_id id, raft::log log, raft::failure_detector& fd) {
return fsm_debug(id, raft::term_t{}, raft::server_id{}, std::move(log), fd, fsm_cfg);
}

@@ -250,6 +159,6 @@ size_t to_local_id(utils::UUID uuid) {
}

// Return true upon a random event with given probability
-bool rolladice(float probability = 1.0/2.0) {
+bool rolladice(float probability) {
return tests::random::get_real(0.0, 1.0) < probability;
}
diff --git a/test/raft/helpers.hh b/test/raft/helpers.hh
index cc4d3f5e90..b657c5175d 100644
--- a/test/raft/helpers.hh
+++ b/test/raft/helpers.hh
@@ -25,8 +25,6 @@

#pragma once

-#define BOOST_TEST_MODULE raft
-
#include <boost/test/unit_test.hpp>
#include "test/lib/log.hh"
#include "test/lib/random_utils.hh"
@@ -37,34 +35,16 @@

using seastar::make_lw_shared;

-void election_threshold(raft::fsm& fsm) {
- // Election threshold should be strictly less than
- // minimal randomized election timeout to make tests
- // stable, but enough to disable "stable leader" rule.
- for (int i = 0; i < raft::ELECTION_TIMEOUT.count(); i++) {
- fsm.tick();
- }
-}
-
-void election_timeout(raft::fsm& fsm) {
- for (int i = 0; i <= 2 * raft::ELECTION_TIMEOUT.count(); i++) {
- fsm.tick();
- }
-}
-
-void make_candidate(raft::fsm& fsm) {
- assert(fsm.is_follower());
- // NOTE: single node skips candidate state
- while (fsm.is_follower()) {
- fsm.tick();
- }
-}
+void election_threshold(raft::fsm& fsm);
+void election_timeout(raft::fsm& fsm);
+void make_candidate(raft::fsm& fsm);

struct trivial_failure_detector: public raft::failure_detector {
bool is_alive(raft::server_id from) override {
return true;
}
-} trivial_failure_detector;
+};
+extern struct trivial_failure_detector trivial_failure_detector;

class discrete_failure_detector: public raft::failure_detector {
bool _is_alive = true;
@@ -83,9 +63,7 @@ template <typename T> void add_entry(raft::log& log, T cmd) {
log.emplace_back(make_lw_shared<raft::log_entry>(raft::log_entry{log.last_term(), log.next_idx(), cmd}));
}

-raft::snapshot log_snapshot(raft::log& log, raft::index_t idx) {
- return raft::snapshot{.idx = idx, .term = log.last_term(), .config = log.get_snapshot().config};
-}
+raft::snapshot log_snapshot(raft::log& log, raft::index_t idx);

template <typename T>
raft::command create_command(T val) {
@@ -95,8 +73,8 @@ raft::command create_command(T val) {
return command;
}

-raft::fsm_config fsm_cfg{.append_request_threshold = 1, .enable_prevoting = false};
-raft::fsm_config fsm_cfg_pre{.append_request_threshold = 1, .enable_prevoting = true};
+extern raft::fsm_config fsm_cfg;
+extern raft::fsm_config fsm_cfg_pre;

class fsm_debug : public raft::fsm {
public:
@@ -119,67 +97,17 @@ class fsm_debug : public raft::fsm {
};

// NOTE: it doesn't compare data contents, just the data type
-bool compare_log_entry(raft::log_entry_ptr le1, raft::log_entry_ptr le2) {
- if (le1->term != le2->term || le1->idx != le2->idx || le1->data.index() != le2->data.index()) {
- return false;
- }
- return true;
-}
-
-bool compare_log_entries(raft::log& log1, raft::log& log2, size_t from, size_t to) {
- assert(to <= log1.last_idx());
- assert(to <= log2.last_idx());
- for (size_t i = from; i <= to; ++i) {
- if (!compare_log_entry(log1[i], log2[i])) {
- return false;
- }
- }
- return true;
-}
-
+bool compare_log_entry(raft::log_entry_ptr le1, raft::log_entry_ptr le2);
+bool compare_log_entries(raft::log& log1, raft::log& log2, size_t from, size_t to);
using raft_routing_map = std::unordered_map<raft::server_id, raft::fsm*>;

-bool deliver(raft_routing_map& routes, raft::server_id from, std::pair<raft::server_id, raft::rpc_message> m) {
- auto it = routes.find(m.first);
- if (it == routes.end()) {
- // Destination not available
- return false;
- }
- std::visit([from, &to = *it->second] (auto&& m) { to.step(from, std::move(m)); }, std::move(m.second));
- return true;
-}
-
-void deliver(raft_routing_map& routes, raft::server_id from, std::vector<std::pair<raft::server_id, raft::rpc_message>> msgs) {
- for (auto& m: msgs) {
- deliver(routes, from, std::move(m));
- }
-}
+bool deliver(raft_routing_map& routes, raft::server_id from,
+ std::pair<raft::server_id, raft::rpc_message> m);
+void deliver(raft_routing_map& routes, raft::server_id from,
+ std::vector<std::pair<raft::server_id, raft::rpc_message>> msgs);

void
-communicate_impl(std::function<bool()> stop_pred, raft_routing_map& map) {
- // To enable tracing, set:
- // global_logger_registry().set_all_loggers_level(seastar::log_level::trace);
- //
- bool has_traffic;
- do {
- has_traffic = false;
- for (auto e : map) {
- raft::fsm& from = *e.second;
- bool has_output;
- for (auto output = from.get_output(); !output.empty(); output = from.get_output()) {
- if (stop_pred()) {
- return;
- }
- for (auto&& m : output.messages) {
- has_traffic = true;
- if (deliver(map, from.id(), std::move(m)) && stop_pred()) {
- return;
- }
- }
- }
- }
- } while (has_traffic);
-}
+communicate_impl(std::function<bool()> stop_pred, raft_routing_map& map);

template <typename... Args>
void communicate_until(std::function<bool()> stop_pred, Args&&... args) {
@@ -212,44 +140,19 @@ raft::fsm* select_leader(Args&&... args) {
}


-raft::server_id id() {
- static int id = 0;
- return raft::server_id{utils::UUID(0, ++id)};
-}
-
-raft::server_address_set address_set(std::vector<raft::server_id> ids) {
- raft::server_address_set set;
- for (auto id : ids) {
- set.emplace(raft::server_address{.id = id});
- }
- return set;
-}
-
-fsm_debug create_follower(raft::server_id id, raft::log log, raft::failure_detector& fd = trivial_failure_detector) {
- return fsm_debug(id, raft::term_t{}, raft::server_id{}, std::move(log), fd, fsm_cfg);
-}
+raft::server_id id();
+raft::server_address_set address_set(std::vector<raft::server_id> ids);
+fsm_debug create_follower(raft::server_id id, raft::log log,
+ raft::failure_detector& fd = trivial_failure_detector);


// Raft uses UUID 0 as special case.
// Convert local 0-based integer id to raft +1 UUID
-utils::UUID to_raft_uuid(size_t local_id) {
- return utils::UUID{0, local_id + 1};
-}
-
-raft::server_id to_raft_id(size_t local_id) {
- return raft::server_id{to_raft_uuid(local_id)};
-}
+utils::UUID to_raft_uuid(size_t local_id);
+raft::server_id to_raft_id(size_t local_id);

// NOTE: can_vote = true
-raft::server_address to_server_address(size_t local_id) {
- return raft::server_address{raft::server_id{to_raft_uuid(local_id)}};
-}
-
-size_t to_local_id(utils::UUID uuid) {
- return uuid.get_least_significant_bits() - 1;
-}
-
+raft::server_address to_server_address(size_t local_id);
+size_t to_local_id(utils::UUID uuid);
// Return true upon a random event with given probability
-bool rolladice(float probability = 1.0/2.0) {
- return tests::random::get_real(0.0, 1.0) < probability;
-}
+bool rolladice(float probability = 1.0/2.0);
--
2.31.1

Alejo Sanchez

unread,
Jul 16, 2021, 11:58:54 AMJul 16
to scylla...@googlegroups.com, kos...@scylladb.com, gl...@scylladb.com, tgra...@scylladb.com, Alejo Sanchez
Common replication test code moved to header.

Signed-off-by: Alejo Sanchez <alejo....@scylladb.com>
---
.../{replication_test.cc => replication.hh} | 525 +------
test/raft/replication_test.cc | 1221 +----------------
2 files changed, 3 insertions(+), 1743 deletions(-)
copy test/raft/{replication_test.cc => replication.hh} (64%)

diff --git a/test/raft/replication_test.cc b/test/raft/replication.hh
similarity index 64%
copy from test/raft/replication_test.cc
copy to test/raft/replication.hh
index 9e86821bea..902c990880 100644
--- a/test/raft/replication_test.cc
+++ b/test/raft/replication.hh
@@ -19,6 +19,8 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/

+#pragma once
+
#include <random>
#include <seastar/core/app-template.hh>
#include <seastar/core/sleep.hh>
@@ -1241,526 +1243,3 @@ future<> run_test(test_case test, bool prevote, bool packet_drops) {
void replication_test(struct test_case test, bool prevote, bool packet_drops) {
run_test(std::move(test), prevote, packet_drops).get();
}
-
-#define RAFT_TEST_CASE(test_name, test_body) \
- SEASTAR_THREAD_TEST_CASE(test_name) { \
- replication_test(test_body, false, false); } \
- SEASTAR_THREAD_TEST_CASE(test_name ## _drops) { \
- replication_test(test_body, false, true); } \
- SEASTAR_THREAD_TEST_CASE(test_name ## _prevote) { \
- replication_test(test_body, true, false); } \
- SEASTAR_THREAD_TEST_CASE(test_name ## _prevote_drops) { \
- replication_test(test_body, true, true); }
-
-// 1 nodes, simple replication, empty, no updates
-RAFT_TEST_CASE(simple_replication, (test_case{
- .nodes = 1}))
-
-// 2 nodes, 4 existing leader entries, 4 updates
-RAFT_TEST_CASE(non_empty_leader_log, (test_case{
- .nodes = 2,
- .initial_states = {{.le = {{1,0},{1,1},{1,2},{1,3}}}},
- .updates = {entries{4}}}));
-
-// 2 nodes, don't add more entries besides existing log
-RAFT_TEST_CASE(non_empty_leader_log_no_new_entries, (test_case{
- .nodes = 2, .total_values = 4,
- .initial_states = {{.le = {{1,0},{1,1},{1,2},{1,3}}}}}));
-
-// 1 nodes, 12 client entries
-RAFT_TEST_CASE(simple_1_auto_12, (test_case{
- .nodes = 1,
- .initial_states = {}, .updates = {entries{12}}}));
-
-// 1 nodes, 12 client entries
-RAFT_TEST_CASE(simple_1_expected, (test_case{
- .nodes = 1, .initial_states = {},
- .updates = {entries{4}}}));
-
-// 1 nodes, 7 leader entries, 12 client entries
-RAFT_TEST_CASE(simple_1_pre, (test_case{
- .nodes = 1,
- .initial_states = {{.le = {{1,0},{1,1},{1,2},{1,3},{1,4},{1,5},{1,6}}}},
- .updates = {entries{12}},}));
-
-// 2 nodes, 7 leader entries, 12 client entries
-RAFT_TEST_CASE(simple_2_pre, (test_case{
- .nodes = 2,
- .initial_states = {{.le = {{1,0},{1,1},{1,2},{1,3},{1,4},{1,5},{1,6}}}},
- .updates = {entries{12}},}));
-
-// 3 nodes, 2 leader changes with 4 client entries each
-RAFT_TEST_CASE(leader_changes, (test_case{
- .nodes = 3,
- .updates = {entries{4},new_leader{1},entries{4},new_leader{2},entries{4}}}));
-
-//
-// NOTE: due to disrupting candidates protection leader doesn't vote for others, and
-// servers with entries vote for themselves, so some tests use 3 servers instead of
-// 2 for simplicity and to avoid a stalemate. This behaviour can be disabled.
-//
-
-// 3 nodes, 7 leader entries, 12 client entries, change leader, 12 client entries
-RAFT_TEST_CASE(simple_3_pre_chg, (test_case{
- .nodes = 3, .initial_term = 2,
- .initial_states = {{.le = {{1,0},{1,1},{1,2},{1,3},{1,4},{1,5},{1,6}}}},
- .updates = {entries{12},new_leader{1},entries{12}},}));
-
-// 3 nodes, leader empty, follower has 3 spurious entries
-// node 1 was leader but did not propagate entries, node 0 becomes leader in new term
-// NOTE: on first leader election term is bumped to 3
-RAFT_TEST_CASE(replace_log_leaders_log_empty, (test_case{
- .nodes = 3, .initial_term = 2,
- .initial_states = {{}, {{{2,10},{2,20},{2,30}}}},
- .updates = {entries{4}}}));
-
-// 3 nodes, 7 leader entries, follower has 9 spurious entries
-RAFT_TEST_CASE(simple_3_spurious_1, (test_case{
- .nodes = 3, .initial_term = 2,
- .initial_states = {{.le = {{1,0},{1,1},{1,2},{1,3},{1,4},{1,5},{1,6}}},
- {{{2,10},{2,11},{2,12},{2,13},{2,14},{2,15},{2,16},{2,17},{2,18}}}},
- .updates = {entries{4}},}));
-
-// 3 nodes, term 3, leader has 9 entries, follower has 5 spurious entries, 4 client entries
-RAFT_TEST_CASE(simple_3_spurious_2, (test_case{
- .nodes = 3, .initial_term = 3,
- .initial_states = {{.le = {{1,0},{1,1},{1,2},{1,3},{1,4},{1,5},{1,6}}},
- {{{2,10},{2,11},{2,12},{2,13},{2,14}}}},
- .updates = {entries{4}},}));
-
-// 3 nodes, term 2, leader has 7 entries, follower has 3 good and 3 spurious entries
-RAFT_TEST_CASE(simple_3_follower_4_1, (test_case{
- .nodes = 3, .initial_term = 3,
- .initial_states = {{.le = {{1,0},{1,1},{1,2},{1,3},{1,4},{1,5},{1,6}}},
- {.le = {{1,0},{1,1},{1,2},{2,20},{2,30},{2,40}}}},
- .updates = {entries{4}}}));
-
-// A follower and a leader have matching logs but leader's is shorter
-// 3 nodes, term 2, leader has 2 entries, follower has same and 5 more, 12 updates
-RAFT_TEST_CASE(simple_3_short_leader, (test_case{
- .nodes = 3, .initial_term = 3,
- .initial_states = {{.le = {{1,0},{1,1}}},
- {.le = {{1,0},{1,1},{1,2},{1,3},{1,4},{1,5},{1,6}}}},
- .updates = {entries{12}}}));
-
-// A follower and a leader have no common entries
-// 3 nodes, term 2, leader has 7 entries, follower has non-matching 6 entries, 12 updates
-RAFT_TEST_CASE(follower_not_matching, (test_case{
- .nodes = 3, .initial_term = 3,
- .initial_states = {{.le = {{1,0},{1,1},{1,2},{1,3},{1,4},{1,5},{1,6}}},
- {.le = {{2,10},{2,20},{2,30},{2,40},{2,50},{2,60}}}},
- .updates = {entries{12}},}));
-
-// A follower and a leader have one common entry
-// 3 nodes, term 2, leader has 3 entries, follower has non-matching 3 entries, 12 updates
-RAFT_TEST_CASE(follower_one_common_1, (test_case{
- .nodes = 3, .initial_term = 4,
- .initial_states = {{.le = {{1,0},{1,1},{1,2}}},
- {.le = {{1,0},{2,11},{2,12},{2,13}}}},
- .updates = {entries{12}}}));
-
-// A follower and a leader have 2 common entries in different terms
-// 3 nodes, term 2, leader has 4 entries, follower has matching but in different term
-RAFT_TEST_CASE(follower_one_common_2, (test_case{
- .nodes = 3, .initial_term = 5,
- .initial_states = {{.le = {{1,0},{2,1},{3,2},{3,3}}},
- {.le = {{1,0},{2,1},{2,2},{2,13}}}},
- .updates = {entries{4}}}));
-
-// 2 nodes both taking snapshot while simple replication
-RAFT_TEST_CASE(take_snapshot, (test_case{
- .nodes = 2,
- .config = {{.snapshot_threshold = 10, .snapshot_trailing = 5}, {.snapshot_threshold = 20, .snapshot_trailing = 10}},
- .updates = {entries{100}}}));
-
-// 2 nodes doing simple replication/snapshoting while leader's log size is limited
-RAFT_TEST_CASE(backpressure, (test_case{
- .nodes = 2,
- .config = {{.snapshot_threshold = 10, .snapshot_trailing = 5, .max_log_size = 20}, {.snapshot_threshold = 20, .snapshot_trailing = 10}},
- .updates = {entries{100}}}));
-
-// 3 nodes, add entries, drop leader 0, add entries [implicit re-join all]
-RAFT_TEST_CASE(drops_01, (test_case{
- .nodes = 3,
- .updates = {entries{4},partition{1,2},entries{4}}}));
-
-// 3 nodes, add entries, drop follower 1, add entries [implicit re-join all]
-RAFT_TEST_CASE(drops_02, (test_case{
- .nodes = 3,
- .updates = {entries{4},partition{0,2},entries{4},partition{2,1}}}));
-
-// 3 nodes, add entries, drop leader 0, custom leader, add entries [implicit re-join all]
-RAFT_TEST_CASE(drops_03, (test_case{
- .nodes = 3,
- .updates = {entries{4},partition{leader{1},2},entries{4}}}));
-
-// 4 nodes, add entries, drop follower 1, custom leader, add entries [implicit re-join all]
-RAFT_TEST_CASE(drops_04, (test_case{
- .nodes = 4,
- .updates = {entries{4},partition{0,2,3},entries{4},partition{1,leader{2},3}}}));
-
-// Repro for dueling candidate for non-prevote scenarios where
-// node (0) is dropped and becomes candidate bumping its term over and over.
-// Meanwhile another node (2) becomes leader and adds entry.
-// When dropped (0) rejoins, followers (1,3) ignore it (leader up and no timeout)
-// but they should not ignore vote requests by current leader (2)
-// or else the elections never succeed
-// Note: for it to hang there has to be 4+ total nodes so
-// 2 dueling candidates don't have enough quorum to resolve election
-RAFT_TEST_CASE(drops_04_dueling_repro, (test_case{
- .nodes = 4,
- .updates = {entries{1},partition{0,2,3},entries{1}, // 0 leader
- partition{1,leader{2},3},entries{1}, // 0 dropped, 2 leader
- tick{40}, // 0 becomes candidate, bumps term
- partition{0,1,2,3},entries{1}, // 0 re-joinin and 0 disrupts
- }}));
-
-// TODO: change to RAFT_TEST_CASE once it's stable for handling packet drops
-SEASTAR_THREAD_TEST_CASE(test_take_snapshot_and_stream) {
- replication_test(
- // Snapshot automatic take and load
- {.nodes = 3,
- .config = {{.snapshot_threshold = 10, .snapshot_trailing = 5}},
- .updates = {entries{5}, partition{0,1}, entries{10}, partition{0, 2}, entries{20}}}
- , false, false);
-}
-
-// Check removing all followers, add entry, bring back one follower and make it leader
-RAFT_TEST_CASE(conf_changes_1, (test_case{
- .nodes = 3,
- .updates = {set_config{0}, entries{1}, set_config{0,1}, entries{1},
- new_leader{1}, entries{1}}}));
-
-// Check removing leader with entries, add entries, remove follower and add back first node
-RAFT_TEST_CASE(conf_changes_2, (test_case{
- .nodes = 3,
- .updates = {entries{1}, new_leader{1}, set_config{1,2}, entries{1},
- set_config{0,1}, entries{1}}}));
-
-// Check removing a node from configuration, adding entries; cycle for all combinations
-SEASTAR_THREAD_TEST_CASE(remove_node_cycle) {
- replication_test(
- {.nodes = 4,
- .updates = {set_config{0,1,2}, entries{2}, new_leader{1},
- set_config{1,2,3}, entries{2}, new_leader{2},
- set_config{2,3,0}, entries{2}, new_leader{3},
- // TODO: find out why it breaks in release mode
- // set_config{3,0,1}, entries{2}, new_leader{0}
- }}
- , false, false);
-}
-
-SEASTAR_THREAD_TEST_CASE(test_leader_change_during_snapshot_transfere) {
- replication_test(
- {.nodes = 3,
- .initial_snapshots = {{.snap = {.idx = raft::index_t(10),
- .term = raft::term_t(1),
- .id = delay_send_snapshot}},
- {.snap = {.idx = raft::index_t(10),
- .term = raft::term_t(1),
- .id = delay_apply_snapshot}}},
- .updates = {tick{10} /* ticking starts snapshot transfer */, new_leader{1}, entries{10}}}
- , false, false);
-}
-
-// verifies that each node in a cluster can campaign
-// and be elected in turn. This ensures that elections work when not
-// starting from a clean slate (as they do in TestLeaderElection)
-// TODO: add pre-vote case
-RAFT_TEST_CASE(etcd_test_leader_cycle, (test_case{
- .nodes = 2,
- .updates = {new_leader{1},new_leader{0},
- new_leader{1},new_leader{0},
- new_leader{1},new_leader{0},
- new_leader{1},new_leader{0}
- }}));
-
-///
-/// RPC-related tests
-///
-
-// 1 node cluster with an initial configuration from a snapshot.
-// Test that RPC configuration is set up correctly when the raft server
-// instance is started.
-RAFT_TEST_CASE(rpc_load_conf_from_snapshot, (test_case{
- .nodes = 1, .total_values = 0,
- .initial_snapshots = {{.snap = {
- .config = raft::configuration{to_raft_id(0)}}}},
- .updates = {check_rpc_config{node_id{0},
- rpc_address_set{node_id{0}}}
- }}));
-
-// 1 node cluster.
-// Initial configuration is taken from the persisted log.
-RAFT_TEST_CASE(rpc_load_conf_from_log, (test_case{
- .nodes = 1, .total_values = 0,
- .initial_states = {{.le = {{1, raft::configuration{to_raft_id(0)}}}}},
- .updates = {check_rpc_config{node_id{0},
- rpc_address_set{node_id{0}}}
- }}));
-
-
-// 3 node cluster {A, B, C}.
-// Shrinked later to 2 nodes and then expanded back to 3 nodes.
-// Test that both configuration changes update RPC configuration correspondingly
-// on all nodes.
-RAFT_TEST_CASE(rpc_propose_conf_change, (test_case{
- .nodes = 3, .total_values = 0,
- .updates = {
- // Remove node C from the cluster configuration.
- set_config{0,1},
- // Check that RPC config is updated both on leader and on follower nodes,
- // i.e. `rpc::remove_server` is called.
- check_rpc_config{{node_id{0},node_id{1}},
- rpc_address_set{node_id{0},node_id{1}}},
- // Re-add node C to the cluster configuration.
- set_config{0,1,2},
- // Check that both A (leader) and B (follower) call `rpc::add_server`,
- // also the newly integrated node gets the actual RPC configuration, too.
- check_rpc_config{{node_id{0},node_id{1},node_id{2}},
- rpc_address_set{node_id{0},node_id{1},node_id{2}}},
- }}));
-
-// 3 node cluster {A, B, C}.
-// Test that leader elections don't change RPC configuration.
-RAFT_TEST_CASE(rpc_leader_election, (test_case{
- .nodes = 3, .total_values = 0,
- .updates = {
- check_rpc_config{{node_id{0},node_id{1},node_id{2}},
- rpc_address_set{node_id{0},node_id{1},node_id{2}}},
- // Elect 2nd node a leader
- new_leader{1},
- check_rpc_config{{node_id{0},node_id{1},node_id{2}},
- rpc_address_set{node_id{0},node_id{1},node_id{2}}},
- }}));
-
-// 3 node cluster {A, B, C}.
-// Test that demoting of node C to learner state and then promoting back
-// to voter doesn't involve any RPC configuration changes.
-RAFT_TEST_CASE(rpc_voter_non_voter_transision, (test_case{
- .nodes = 3, .total_values = 0,
- .updates = {
- check_rpc_config{{node_id{0},node_id{1},node_id{2}},
- rpc_address_set{node_id{0},node_id{1},node_id{2}}},
- rpc_reset_counters{{node_id{0},node_id{1},node_id{2}}},
- // Make C a non-voting member.
- set_config{0, 1, set_config_entry(2, false)},
- // Check that RPC configuration didn't change.
- check_rpc_added{{node_id{0},node_id{1},node_id{2}},0},
- check_rpc_removed{{node_id{0},node_id{1},node_id{2}},0},
- // Make C a voting member.
- set_config{0, 1, 2},
- // RPC configuration shouldn't change.
- check_rpc_added{{node_id{0},node_id{1},node_id{2}},0},
- check_rpc_removed{{node_id{0},node_id{1},node_id{2}},0},
- }}));
-
-// 3 node cluster {A, B, C}.
-// Issue a configuration change on leader (A): add node D.
-// Fail the node before the entry is committed (disconnect from the
-// rest of the cluster and restart the node).
-//
-// In the meanwhile, elect a new leader within the connected part of the
-// cluster (B). A becomes an isolated follower in this case.
-// A should observe {A, B, C, D} RPC configuration: when in joint
-// consensus, we need to account for servers in both configurations.
-//
-// Heal network partition and observe that A's log is truncated (actually,
-// it's empty since B does not have any entries at all, except for dummies).
-// The RPC configuration on A is restored from initial snapshot configuration,
-// which is {A, B, C}.
-
-RAFT_TEST_CASE(rpc_configuration_truncate_restore_from_snp, (test_case{
- .nodes = 3, .total_values = 0,
- .updates = {
-
- // Disconnect A from B and C.
- partition{1,2},
- // Emulate a failed configuration change on A (add node D) by
- // restarting A with a modified initial log containing one extraneous
- // configuration entry.
- stop{0},
- // Restart A with a synthetic initial state representing
- // the same initial snapshot config (A, B, C) as before,
- // but with the following things in mind:
- // * log contains only one entry: joint configuration entry
- // that is equivalent to that of A's before the crash.
- // * The configuration entry would have term=1 so that it'll
- // be truncated when A gets in contact with other nodes
- // * This will completely erase all entries on A leaving its
- // log empty.
- reset{.id = 0, .state = {
- .log = { raft::log_entry{raft::term_t(1), raft::index_t(1),
- config{.curr = {node_id{0},node_id{1},node_id{2},node_id{3}},
- .prev = {node_id{0},node_id{1},node_id{2}}}}},
- .snapshot = {.config = address_set({node_id{0},node_id{1},node_id{2}})
- }
- }},
- // A should see {A, B, C, D} as RPC config since
- // the latest configuration entry points to joint
- // configuration {.current = {A, B, C, D}, .previous = {A, B, C}}.
- // RPC configuration is computed as a union of current
- // and previous configurations.
- check_rpc_config{node_id{0},
- rpc_address_set{node_id{0},node_id{1},node_id{2},node_id{3}}},
-
- // Elect B as leader
- new_leader{1},
-
- // Heal network partition.
- partition{0,1,2},
-
- // wait to synchronize logs between current leader (B) and A
- wait_log{0},
-
- // A should have truncated an offending configuration entry and revert its RPC configuration.
- //
- // Since B's log is effectively empty (does not contain any configuration
- // entries), A's configuration view ({A, B, C}) is restored from
- // initial snapshot.
- check_rpc_config{node_id{0},
- rpc_address_set{node_id{0},node_id{1},node_id{2}}}}}));
-
-// 4 node cluster {A, B, C, D}.
-// Change configuration to {A, B, C} from A and wait for it to become
-// committed.
-//
-// Then, issue a configuration change on leader (A): remove node C.
-// Fail the node before the entry is committed (disconnect from the
-// rest of the cluster and restart the node). We emulate this behavior by
-// just terminating the node and restarting it with a pre-defined state
-// that is equivalent to having an uncommitted configuration entry in
-// the log.
-//
-// In the meanwhile, elect a new leader within the connected part of the
-// cluster (B). A becomes an isolated follower in this case.
-//
-// Heal network partition and observe that A's log is truncated and
-// replaced with that of B. RPC configuration should not change between
-// the crash + network partition and synchronization with B, since
-// the effective RPC cfg would be {A, B, C} both for
-// joint cfg = {.current = {A, B}, .previous = {A, B, C}}
-// and the previously commited cfg = {A, B, C}.
-//
-// After that, test for the second case: switch leader back to A and
-// try to expand the cluster back to initial state (re-add
-// node D): {A, B, C, D}.
-//
-// Try to set configuration {A, B, C, D} on leader A, isolate and crash it.
-// Restart with synthetic state containing an uncommitted configuration entry.
-//
-// This time before healing the network we should observe
-// RPC configuration = {A, B, C, D}, accounting for an uncommitted part of the
-// configuration.
-// After healing the network and synchronizing with new leader B, RPC config
-// should be reverted back to committed state {A, B, C}.
-RAFT_TEST_CASE(rpc_configuration_truncate_restore_from_log, (test_case{
- .nodes = 4, .total_values = 0,
- .updates = {
-
- // Remove node D from the cluster configuration.
- set_config{0, 1, 2},
- // {A, B, C} configuration is committed by now.
-
- //
- // First case: shrink cluster (remove node C).
- //
-
- // Disconnect A from the rest of the cluster.
- partition{1,2,3},
- // Try to change configuration (remove node C)
- // `set_configuration` call will fail on A because
- // it's cut off the other nodes and it will be waiting for them,
- // but A is terminated before the network is allowed to heal the partition.
- stop{0},
- // Restart A with a synthetic initial state that contains two entries
- // in the log:
- // 1. {A, B, C} configuration committed before crash + partition.
- // 2. uncommitted joint configuration entry that is equivalent
- // to that of A's before the crash.
- reset{.id = 0, .state = {
- .log = {
- // First term committed conf {A, B, C}
- raft::log_entry{raft::term_t(1), raft::index_t(1),
- config{.curr = {node_id{0},node_id{1},node_id{2}}}},
- // Second term (uncommitted) {A, B} and prev committed {A, B, C}
- raft::log_entry{raft::term_t(2), raft::index_t(2),
- config{.curr = {node_id{0},node_id{1}},
- .prev = {node_id{0},node_id{1},node_id{2}}}
- },
- },
- // all nodes in snapshot config {A, B, C, D} (original)
- .snapshot = {.config = address_set({node_id{0},node_id{1},node_id{2},node_id{3}})
- }
- }},
-
- // A's RPC configuration should stay the same because
- // for both uncommitted joint cfg = {.current = {A, B}, .previous = {A, B, C}}
- // and committed cfg = {A, B, C} the RPC cfg would be equal to {A, B, C}
- check_rpc_config{node_id{0},
- rpc_address_set{node_id{0},node_id{1},node_id{2}}},
-
- // Elect B as leader
- new_leader{1},
-
- // Heal network partition. Connect all.
- partition{0,1,2,3},
-
- wait_log{0,2},
-
- // Again, A's RPC configuration is the same as before despite the
- // real cfg being reverted to the committed state as it is the union
- // between current and previous configurations in case of
- // joint cfg, anyway.
- check_rpc_config{{node_id{0},node_id{1},node_id{2}},
- rpc_address_set{node_id{0},node_id{1},node_id{2}}},
-
- //
- // Second case: expand cluster (re-add node D).
- //
-
- // Elect A leader again.
- new_leader{0},
- wait_log{1,2},
-
- // Disconnect A from the rest of the cluster.
- partition{1,2,3},
-
- // Try to add D back.
- stop{0},
- reset{.id = 0, .state = {
- .log = {
- // First term committed conf {A, B, C}
- raft::log_entry{raft::term_t(1), raft::index_t(1),
- config{.curr = {node_id{0},node_id{1},node_id{2}}}},
- // Second term (all) {A, B, C, D} and prev committed {A, B, C}
- raft::log_entry{raft::term_t(2), raft::index_t(2),
- config{.curr = {node_id{0},node_id{1},node_id{2},node_id{3}},
- .prev = {node_id{0},node_id{1},node_id{2}}}
- },
- },
- // all nodes in snapshot config {A, B, C, D} (original)
- .snapshot = {.config = address_set({node_id{0},node_id{1},node_id{2},node_id{3}})
- }
- }},
-
- // A should observe RPC configuration = {A, B, C, D} since it's the union
- // of an uncommitted joint config components
- // {.current = {A, B, C, D}, .previous = {A, B, C}}.
- check_rpc_config{node_id{0},
- rpc_address_set{node_id{0},node_id{1},node_id{2},node_id{3}}},
-
- // Elect B as leader
- new_leader{1},
-
- // Heal network partition. Connect all.
- partition{0,1,2,3},
-
- // wait to synchronize logs between current leader (B) and the rest of the cluster
- wait_log{0,2},
-
- // A's RPC configuration is reverted to committed configuration {A, B, C}.
- check_rpc_config{{node_id{0},node_id{1},node_id{2}},
- rpc_address_set{node_id{0},node_id{1},node_id{2}}},
- }}));
-
diff --git a/test/raft/replication_test.cc b/test/raft/replication_test.cc
index 9e86821bea..4bdbb133ce 100644
--- a/test/raft/replication_test.cc
+++ b/test/raft/replication_test.cc
@@ -19,1228 +19,9 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/

-#include <random>
-#include <seastar/core/app-template.hh>
-#include <seastar/core/sleep.hh>
-#include <seastar/core/coroutine.hh>
-#include <seastar/core/loop.hh>
-#include <seastar/util/log.hh>
-#include <seastar/util/later.hh>
-#include <seastar/util/variant_utils.hh>
-#include <seastar/testing/random.hh>
-#include <seastar/testing/thread_test_case.hh>
-#include <seastar/testing/test_case.hh>
-#include "raft/server.hh"
-#include "serializer.hh"
-#include "serializer_impl.hh"
-#include "xx_hasher.hh"
-#include "test/raft/helpers.hh"
-#include "test/lib/eventually.hh"
+#include "replication.hh"

// Test Raft library with declarative test definitions
-//
-// Each test can be defined by (struct test_case):
-// .nodes number of nodes
-// .total_values how many entries to append to leader nodes (default 100)
-// .initial_term initial term # for setup
-// .initial_leader what server is leader
-// .initial_states initial logs of servers
-// .le log entries
-// .initial_snapshots snapshots present at initial state for servers
-// .updates updates to execute on these servers
-// entries{x} add the following x entries to the current leader
-// new_leader{x} elect x as new leader
-// partition{a,b,c} Only servers a,b,c are connected
-// partition{a,leader{b},c} Only servers a,b,c are connected, and make b leader
-// set_config{a,b,c} Change configuration on leader
-// set_config{a,b,c} Change configuration on leader
-// check_rpc_config{a,cfg} Check rpc config of a matches
-// check_rpc_config{[],cfg} Check rpc config multiple nodes matches
-//
-// run_test
-// - Creates the servers and initializes logs and snapshots
-// with hasher/digest and tickers to advance servers
-// - Processes updates one by one
-// - Appends remaining values
-// - Waits until all servers have logs of size of total_values entries
-// - Verifies hash
-// - Verifies persisted snapshots
-//
-// Tests are run also with 20% random packet drops.
-// Two test cases are created for each with the macro
-// RAFT_TEST_CASE(<test name>, <test case>)
-
-using namespace std::chrono_literals;
-using namespace std::placeholders;
-
-static seastar::logger tlogger("test");
-
-lowres_clock::duration tick_delta = 1ms;
-
-auto dummy_command = std::numeric_limits<int>::min();
-
-class hasher_int : public xx_hasher {
-public:
- using xx_hasher::xx_hasher;
- void update(int val) noexcept {
- xx_hasher::update(reinterpret_cast<const char *>(&val), sizeof(val));
- }
- static hasher_int hash_range(int max) {
- hasher_int h;
- for (int i = 0; i < max; ++i) {
- h.update(i);
- }
- return h;
- }
-};
-
-struct snapshot_value {
- hasher_int hasher;
- raft::index_t idx;
-};
-
-struct initial_state {
- raft::server_address address;
- raft::term_t term = raft::term_t(1);
- raft::server_id vote;
- std::vector<raft::log_entry> log;
- raft::snapshot snapshot;
- snapshot_value snp_value;
- raft::server::configuration server_config = raft::server::configuration{.append_request_threshold = 200};
-};
-
-// For verbosity in test declaration (i.e. node_id{x})
-struct node_id {
- size_t id;
-};
-
-
-std::vector<raft::server_id> to_raft_id_vec(std::vector<node_id> nodes) {
- std::vector<raft::server_id> ret;
- for (auto node: nodes) {
- ret.push_back(raft::server_id{to_raft_uuid(node.id)});
- }
- return ret;
-}
-
-raft::server_address_set address_set(std::vector<node_id> nodes) {
- return address_set(to_raft_id_vec(nodes));
-}
-
-// Updates can be
-// - Entries
-// - Leader change
-// - Configuration change
-struct entries {
- size_t n;
-};
-struct new_leader {
- size_t id;
-};
-struct leader {
- size_t id;
-};
-using partition = std::vector<std::variant<leader,int>>;
-
-// Disconnect 2 servers both ways
-struct two_nodes {
- size_t first;
- size_t second;
-};
-struct disconnect : public two_nodes {};
-
-struct stop {
- size_t id;
-};
-
-struct reset {
- size_t id;
- initial_state state;
-};
-
-struct wait_log {
- std::vector<size_t> local_ids;
- wait_log(size_t local_id) : local_ids({local_id}) {}
- wait_log(std::initializer_list<size_t> local_ids) : local_ids(local_ids) {}
-};
-
-struct set_config_entry {
- size_t node_idx;
- bool can_vote;
-
- set_config_entry(size_t idx, bool can_vote = true)
- : node_idx(idx), can_vote(can_vote)
- {}
-};
-using set_config = std::vector<set_config_entry>;
-
-struct config {
- std::vector<node_id> curr;
- std::vector<node_id> prev;
- operator raft::configuration() {
- auto current = address_set(curr);
- auto previous = address_set(prev);
- return raft::configuration{current, previous};
- }
-};
-
-using rpc_address_set = std::vector<node_id>;
-
-struct check_rpc_config {
- std::vector<node_id> nodes;
- rpc_address_set addrs;
- check_rpc_config(node_id node, rpc_address_set addrs) : nodes({node}), addrs(addrs) {}
- check_rpc_config(std::vector<node_id> nodes, rpc_address_set addrs) : nodes(nodes), addrs(addrs) {}
-};
-
-struct check_rpc_added {
- std::vector<node_id> nodes;
- size_t expected;
- check_rpc_added(node_id node, size_t expected) : nodes({node}), expected(expected) {}
- check_rpc_added(std::vector<node_id> nodes, size_t expected) : nodes(nodes), expected(expected) {}
-};
-
-struct check_rpc_removed {
- std::vector<node_id> nodes;
- size_t expected;
- check_rpc_removed(node_id node, size_t expected) : nodes({node}), expected(expected) {}
- check_rpc_removed(std::vector<node_id> nodes, size_t expected) : nodes(nodes), expected(expected) {}
-};
-
-using rpc_reset_counters = std::vector<node_id>;
-
-struct tick {
- uint64_t ticks;
-};
-
-using update = std::variant<entries, new_leader, partition, disconnect, stop, reset, wait_log,
- set_config, check_rpc_config, check_rpc_added, check_rpc_removed, rpc_reset_counters,
- tick>;
-
-struct log_entry {
- unsigned term;
- std::variant<int, raft::configuration> data;
-};
-
-struct initial_log {
- std::vector<log_entry> le;
-};
-
-struct initial_snapshot {
- raft::snapshot snap;
-};
-
-struct test_case {
- const size_t nodes;
- const size_t total_values = 100;
- uint64_t initial_term = 1;
- const size_t initial_leader = 0;
- const std::vector<struct initial_log> initial_states;
- const std::vector<struct initial_snapshot> initial_snapshots;
- const std::vector<raft::server::configuration> config;
- const std::vector<update> updates;
- size_t get_first_val();
-};
-
-size_t test_case::get_first_val() {
- // Count existing leader snap index and entries, if present
- size_t first_val = 0;
- if (initial_leader < initial_states.size()) {
- first_val += initial_states[initial_leader].le.size();
- }
- if (initial_leader < initial_snapshots.size()) {
- first_val = initial_snapshots[initial_leader].snap.idx;
- }
- return first_val;
-}
-
-std::mt19937 random_generator() {
- auto& gen = seastar::testing::local_random_engine;
- return std::mt19937(gen());
-}
-
-int rand() {
- static thread_local std::uniform_int_distribution<int> dist(0, std::numeric_limits<uint8_t>::max());
- static thread_local auto gen = random_generator();
-
- return dist(gen);
-}
-
-// Lets assume one snapshot per server
-using snapshots = std::unordered_map<raft::server_id, snapshot_value>;
-using persisted_snapshots = std::unordered_map<raft::server_id, std::pair<raft::snapshot, snapshot_value>>;
-
-seastar::semaphore snapshot_sync(0);
-// application of a snaphot with that id will be delayed until snapshot_sync is signaled
-raft::snapshot_id delay_apply_snapshot{utils::UUID(0, 0xdeadbeaf)};
-// sending of a snaphot with that id will be delayed until snapshot_sync is signaled
-raft::snapshot_id delay_send_snapshot{utils::UUID(0xdeadbeaf, 0)};
-
-class raft_cluster {
- using apply_fn = std::function<size_t(raft::server_id id, const std::vector<raft::command_cref>& commands, lw_shared_ptr<hasher_int> hasher)>;
- class state_machine;
- class persistence;
- class connected;
- class failure_detector;
- class rpc;
- struct test_server {
- std::unique_ptr<raft::server> server;
- state_machine* sm;
- raft_cluster::rpc* rpc;
- };
- std::vector<test_server> _servers;
- std::unique_ptr<connected> _connected;
- std::unique_ptr<snapshots> _snapshots;
- std::unique_ptr<persisted_snapshots> _persisted_snapshots;
- size_t _apply_entries;
- size_t _next_val;
- bool _packet_drops;
- bool _prevote;
- apply_fn _apply;
- std::unordered_set<size_t> _in_configuration; // Servers in current configuration
- std::vector<seastar::timer<lowres_clock>> _tickers;
- size_t _leader;
- std::vector<initial_state> get_states(test_case test, bool prevote);
-public:
- raft_cluster(test_case test,
- apply_fn apply,
- size_t apply_entries, size_t first_val, size_t first_leader,
- bool prevote, bool packet_drops);
- // No copy
- raft_cluster(const raft_cluster&) = delete;
- raft_cluster(raft_cluster&&) = default;
- future<> stop_server(size_t id);
- future<> reset_server(size_t id, initial_state state); // Reset a stopped server
- size_t size() {
- return _servers.size();
- }
- future<> start_all();
- future<> stop_all();
- future<> wait_all();
- void tick_all();
- void disconnect(size_t id, std::optional<raft::server_id> except = std::nullopt);
- void connect_all();
- void elapse_elections();
- future<> elect_new_leader(size_t new_leader);
- future<> free_election();
- void init_raft_tickers();
- void pause_tickers();
- void restart_tickers();
- void cancel_ticker(size_t id);
- void set_ticker_callback(size_t id) noexcept;
- future<> add_entries(size_t n);
- future<> add_remaining_entries();
- future<> wait_log(size_t follower);
- future<> wait_log(::wait_log followers);
- future<> wait_log_all();
- future<> change_configuration(::set_config sc);
- future<> check_rpc_config(::check_rpc_config cc);
- void check_rpc_added(::check_rpc_added expected) const;
- void check_rpc_removed(::check_rpc_removed expected) const;
- void rpc_reset_counters(::rpc_reset_counters nodes);
- future<> reconfigure_all();
- future<> partition(::partition p);
- future<> tick(::tick t);
- future<> stop(::stop server);
- future<> reset(::reset server);
- void disconnect(::disconnect nodes);
- void verify();
-private:
- test_server create_server(size_t id, initial_state state);
-};
-
-class raft_cluster::state_machine : public raft::state_machine {
- raft::server_id _id;
- apply_fn _apply;
- size_t _apply_entries;
- size_t _seen = 0;
- promise<> _done;
- snapshots* _snapshots;
-public:
- lw_shared_ptr<hasher_int> hasher;
- state_machine(raft::server_id id, apply_fn apply, size_t apply_entries,
- snapshots* snapshots):
- _id(id), _apply(std::move(apply)), _apply_entries(apply_entries), _snapshots(snapshots),
- hasher(make_lw_shared<hasher_int>()) {}
- future<> apply(const std::vector<raft::command_cref> commands) override {
- auto n = _apply(_id, commands, hasher);
- _seen += n;
- if (n && _seen == _apply_entries) {
- _done.set_value();
- }
- tlogger.debug("sm::apply[{}] got {}/{} entries", _id, _seen, _apply_entries);
- return make_ready_future<>();
- }
-
- future<raft::snapshot_id> take_snapshot() override {
- (*_snapshots)[_id].hasher = *hasher;
- tlogger.debug("sm[{}] takes snapshot {}", _id, (*_snapshots)[_id].hasher.finalize_uint64());
- (*_snapshots)[_id].idx = raft::index_t{_seen};
- return make_ready_future<raft::snapshot_id>(raft::snapshot_id::create_random_id());
- }
- void drop_snapshot(raft::snapshot_id id) override {
- (*_snapshots).erase(_id);
- }
- future<> load_snapshot(raft::snapshot_id id) override {
- hasher = make_lw_shared<hasher_int>((*_snapshots)[_id].hasher);
- tlogger.debug("sm[{}] loads snapshot {}", _id, (*_snapshots)[_id].hasher.finalize_uint64());
- _seen = (*_snapshots)[_id].idx;
- if (_seen >= _apply_entries) {
- _done.set_value();
- }
- if (id == delay_apply_snapshot) {
- snapshot_sync.signal();
- co_await snapshot_sync.wait();
- }
- co_return;
- };
- future<> abort() override { return make_ready_future<>(); }
-
- future<> done() {
- return _done.get_future();
- }
-};
-
-class raft_cluster::persistence : public raft::persistence {
- raft::server_id _id;
- initial_state _conf;
- snapshots* _snapshots;
- persisted_snapshots* _persisted_snapshots;
-public:
- persistence(raft::server_id id, initial_state conf, snapshots* snapshots,
- persisted_snapshots* persisted_snapshots) : _id(id),
- _conf(std::move(conf)), _snapshots(snapshots),
- _persisted_snapshots(persisted_snapshots) {}
- persistence() {}
- virtual future<> store_term_and_vote(raft::term_t term, raft::server_id vote) { return seastar::sleep(1us); }
- virtual future<std::pair<raft::term_t, raft::server_id>> load_term_and_vote() {
- auto term_and_vote = std::make_pair(_conf.term, _conf.vote);
- return make_ready_future<std::pair<raft::term_t, raft::server_id>>(term_and_vote);
- }
- virtual future<> store_snapshot(const raft::snapshot& snap, size_t preserve_log_entries) {
- (*_persisted_snapshots)[_id] = std::make_pair(snap, (*_snapshots)[_id]);
- tlogger.debug("sm[{}] persists snapshot {}", _id, (*_snapshots)[_id].hasher.finalize_uint64());
- return make_ready_future<>();
- }
- future<raft::snapshot> load_snapshot() override {
- return make_ready_future<raft::snapshot>(_conf.snapshot);
- }
- virtual future<> store_log_entries(const std::vector<raft::log_entry_ptr>& entries) { return seastar::sleep(1us); };
- virtual future<raft::log_entries> load_log() {
- raft::log_entries log;
- for (auto&& e : _conf.log) {
- log.emplace_back(make_lw_shared(std::move(e)));
- }
- return make_ready_future<raft::log_entries>(std::move(log));
- }
- virtual future<> truncate_log(raft::index_t idx) { return make_ready_future<>(); }
- virtual future<> abort() { return make_ready_future<>(); }
-};
-
-struct raft_cluster::connected {
- struct connection {
- raft::server_id from;
- raft::server_id to;
- bool operator==(const connection &o) const {
- return from == o.from && to == o.to;
- }
- };
-
- struct hash_connection {
- std::size_t operator() (const connection &c) const {
- return std::hash<utils::UUID>()(c.from.id);
- }
- };
-
- // Map of from->to disconnections
- std::unordered_set<connection, hash_connection> disconnected;
- size_t n;
- connected(size_t n) : n(n) { }
- // Cut connectivity of two servers both ways
- void cut(raft::server_id id1, raft::server_id id2) {
- disconnected.insert({id1, id2});
- disconnected.insert({id2, id1});
- }
- // Isolate a server
- void disconnect(raft::server_id id, std::optional<raft::server_id> except = std::nullopt) {
- for (size_t other = 0; other < n; ++other) {
- auto other_id = to_raft_id(other);
- // Disconnect if not the same, and the other id is not an exception
- // disconnect(0, except=1)
- if (id != other_id && !(except && other_id == *except)) {
- cut(id, other_id);
- }
- }
- }
- // Re-connect a node to all other nodes
- void connect(raft::server_id id) {
- for (auto it = disconnected.begin(); it != disconnected.end(); ) {
- if (id == it->from || id == it->to) {
- it = disconnected.erase(it);
- } else {
- ++it;
- }
- }
- }
- void connect_all() {
- disconnected.clear();
- }
- bool operator()(raft::server_id id1, raft::server_id id2) {
- // It's connected if both ways are not disconnected
- return !disconnected.contains({id1, id2}) && !disconnected.contains({id1, id2});
- }
-};
-
-class raft_cluster::failure_detector : public raft::failure_detector {
- raft::server_id _id;
- connected* _connected;
-public:
- failure_detector(raft::server_id id, connected* connected) : _id(id), _connected(connected) {}
- bool is_alive(raft::server_id server) override {
- return (*_connected)(server, _id);
- }
-};
-
-class raft_cluster::rpc : public raft::rpc {
- static std::unordered_map<raft::server_id, rpc*> net;
- raft::server_id _id;
- connected* _connected;
- snapshots* _snapshots;
- bool _packet_drops;
- raft::server_address_set _known_peers;
- uint32_t _servers_added = 0;
- uint32_t _servers_removed = 0;
-public:
- rpc(raft::server_id id, connected* connected, snapshots* snapshots,
- bool packet_drops) : _id(id), _connected(connected), _snapshots(snapshots),
- _packet_drops(packet_drops) {
- net[_id] = this;
- }
- virtual future<raft::snapshot_reply> send_snapshot(raft::server_id id, const raft::install_snapshot& snap, seastar::abort_source& as) {
- if (!net.count(id)) {
- throw std::runtime_error("trying to send a message to an unknown node");
- }
- if (!(*_connected)(id, _id)) {
- throw std::runtime_error("cannot send snapshot since nodes are disconnected");
- }
- (*_snapshots)[id] = (*_snapshots)[_id];
- auto s = snap; // snap is not always held alive by a caller
- if (s.snp.id == delay_send_snapshot) {
- co_await snapshot_sync.wait();
- snapshot_sync.signal();
- }
- co_return co_await net[id]->_client->apply_snapshot(_id, std::move(s));
- }
- virtual future<> send_append_entries(raft::server_id id, const raft::append_request& append_request) {
- if (!net.count(id)) {
- return make_exception_future(std::runtime_error("trying to send a message to an unknown node"));
- }
- if (!(*_connected)(id, _id)) {
- return make_exception_future<>(std::runtime_error("cannot send append since nodes are disconnected"));
- }
- if (!_packet_drops || (rand() % 5)) {
- net[id]->_client->append_entries(_id, append_request);
- }
- return make_ready_future<>();
- }
- virtual void send_append_entries_reply(raft::server_id id, const raft::append_reply& reply) {
- if (!net.count(id)) {
- return;
- }
- if (!(*_connected)(id, _id)) {
- return;
- }
- if (!_packet_drops || (rand() % 5)) {
- net[id]->_client->append_entries_reply(_id, std::move(reply));
- }
- }
- virtual void send_vote_request(raft::server_id id, const raft::vote_request& vote_request) {
- if (!net.count(id)) {
- return;
- }
- if (!(*_connected)(id, _id)) {
- return;
- }
- net[id]->_client->request_vote(_id, std::move(vote_request));
- }
- virtual void send_vote_reply(raft::server_id id, const raft::vote_reply& vote_reply) {
- if (!net.count(id)) {
- return;
- }
- if (!(*_connected)(id, _id)) {
- return;
- }
- net[id]->_client->request_vote_reply(_id, std::move(vote_reply));
- }
- virtual void send_timeout_now(raft::server_id id, const raft::timeout_now& timeout_now) {
- if (!net.count(id)) {
- return;
- }
- if (!(*_connected)(id, _id)) {
- return;
- }
- net[id]->_client->timeout_now_request(_id, std::move(timeout_now));
- }
- virtual void add_server(raft::server_id id, bytes node_info) {
- _known_peers.insert(raft::server_address{id});
- ++_servers_added;
- }
- virtual void remove_server(raft::server_id id) {
- _known_peers.erase(raft::server_address{id});
- ++_servers_removed;
- }
- virtual future<> abort() { return make_ready_future<>(); }
- static void reset_network() {
- net.clear();
- }
-
- const raft::server_address_set& known_peers() const {
- return _known_peers;
- }
- void reset_counters() {
- _servers_added = 0;
- _servers_removed = 0;
- }
- uint32_t servers_added() const {
- return _servers_added;
- }
- uint32_t servers_removed() const {
- return _servers_removed;
- }
-};
-
-std::unordered_map<raft::server_id, raft_cluster::rpc*> raft_cluster::rpc::net;
-
-raft_cluster::test_server raft_cluster::create_server(size_t id, initial_state state) {
-
- auto uuid = to_raft_id(id);
- auto sm = std::make_unique<state_machine>(uuid, _apply, _apply_entries, _snapshots.get());
- auto& rsm = *sm;
-
- auto mrpc = std::make_unique<raft_cluster::rpc>(uuid, _connected.get(),
- _snapshots.get(), _packet_drops);
- auto& rpc_ref = *mrpc;
-
- auto mpersistence = std::make_unique<persistence>(uuid, state,
- _snapshots.get(), _persisted_snapshots.get());
- auto fd = seastar::make_shared<failure_detector>(uuid, _connected.get());
-
- auto raft = raft::create_server(uuid, std::move(mrpc), std::move(sm), std::move(mpersistence),
- std::move(fd), state.server_config);
-
- return {
- std::move(raft),
- &rsm,
- &rpc_ref
- };
-}
-
-raft_cluster::raft_cluster(test_case test,
- apply_fn apply,
- size_t apply_entries, size_t first_val, size_t first_leader,
- bool prevote, bool packet_drops) :
- _connected(std::make_unique<struct connected>(test.nodes)),
- _snapshots(std::make_unique<snapshots>()),
- _persisted_snapshots(std::make_unique<persisted_snapshots>()),
- _apply_entries(apply_entries),
- _next_val(first_val),
- _packet_drops(packet_drops),
- _prevote(prevote),
- _apply(apply),
- _leader(first_leader) {
-
- rpc::reset_network();
-
- auto states = get_states(test, prevote);
- for (size_t s = 0; s < states.size(); ++s) {
- _in_configuration.insert(s);
- }
-
- raft::configuration config;
-
- for (size_t i = 0; i < states.size(); i++) {
- states[i].address = raft::server_address{to_raft_id(i)};
- config.current.emplace(states[i].address);
- }
-
- for (size_t i = 0; i < states.size(); i++) {
- auto& s = states[i].address;
- states[i].snapshot.config = config;
- (*_snapshots)[s.id] = states[i].snp_value;
- _servers.emplace_back(create_server(i, states[i]));
- }
-}
-
-future<> raft_cluster::stop_server(size_t id) {
- cancel_ticker(id);
- co_await _servers[id].server->abort();
- _snapshots->erase(to_raft_id(id));
- _persisted_snapshots->erase(to_raft_id(id));
-}
-
-// Reset previously stopped server
-future<> raft_cluster::reset_server(size_t id, initial_state state) {
- _servers[id] = create_server(id, state);
- co_await _servers[id].server->start();
- set_ticker_callback(id);
-}
-
-future<> raft_cluster::start_all() {
- co_await parallel_for_each(_servers, [] (auto& r) {
- return r.server->start();
- });
- init_raft_tickers();
- BOOST_TEST_MESSAGE("Electing first leader " << _leader);
- _servers[_leader].server->wait_until_candidate();
- co_await _servers[_leader].server->wait_election_done();
-}
-
-future<> raft_cluster::stop_all() {
- for (auto s: _in_configuration) {
- co_await stop_server(s);
- };
-}
-
-future<> raft_cluster::wait_all() {
- for (auto s: _in_configuration) {
- co_await _servers[s].sm->done();
- }
-}
-
-void raft_cluster::tick_all() {
- for (auto s: _in_configuration) {
- _servers[s].server->tick();
- }
-}
-
-void raft_cluster::disconnect(size_t id, std::optional<raft::server_id> except) {
- _connected->disconnect(to_raft_id(id), except);
-}
-
-void raft_cluster::connect_all() {
- _connected->connect_all();
-}
-
-// Add consecutive integer entries to a leader
-future<> raft_cluster::add_entries(size_t n) {
- size_t end = _next_val + n;
- while (_next_val != end) {
- try {
- co_await _servers[_leader].server->add_entry(create_command(_next_val), raft::wait_type::committed);
- _next_val++;
- } catch (raft::not_a_leader& e) {
- // leader stepped down, update with new leader if present
- if (e.leader != raft::server_id{}) {
- _leader = to_local_id(e.leader.id);
- }
- } catch (raft::commit_status_unknown& e) {
- } catch (raft::dropped_entry& e) {
- // retry if an entry is dropped because the leader have changed after it was submitetd
- }
- }
-}
-
-future<> raft_cluster::add_remaining_entries() {
- co_await add_entries(_apply_entries - _next_val);
-}
-
-void raft_cluster::init_raft_tickers() {
- _tickers.resize(_servers.size());
- // Only start tickers for servers in configuration
- for (auto s: _in_configuration) {
- _tickers[s].arm_periodic(tick_delta);
- _tickers[s].set_callback([&, s] {
- _servers[s].server->tick();
- });
- }
-}
-
-void raft_cluster::pause_tickers() {
- for (auto s: _in_configuration) {
- _tickers[s].cancel();
- }
-}
-
-void raft_cluster::restart_tickers() {
- for (auto s: _in_configuration) {
- _tickers[s].rearm_periodic(tick_delta);
- }
-}
-
-void raft_cluster::cancel_ticker(size_t id) {
- _tickers[id].cancel();
-}
-
-void raft_cluster::set_ticker_callback(size_t id) noexcept {
- _tickers[id].set_callback([&, id] {
- _servers[id].server->tick();
- });
-}
-
-std::vector<raft::log_entry> create_log(std::vector<log_entry> list, unsigned start_idx) {
- std::vector<raft::log_entry> log;
-
- unsigned i = start_idx;
- for (auto e : list) {
- if (std::holds_alternative<int>(e.data)) {
- log.push_back(raft::log_entry{raft::term_t(e.term), raft::index_t(i++),
- create_command(std::get<int>(e.data))});
- } else {
- log.push_back(raft::log_entry{raft::term_t(e.term), raft::index_t(i++),
- std::get<raft::configuration>(e.data)});
- }
- }
-
- return log;
-}
-
-size_t apply_changes(raft::server_id id, const std::vector<raft::command_cref>& commands,
- lw_shared_ptr<hasher_int> hasher) {
- size_t entries = 0;
- tlogger.debug("sm::apply_changes[{}] got {} entries", id, commands.size());
-
- for (auto&& d : commands) {
- auto is = ser::as_input_stream(d);
- int n = ser::deserialize(is, boost::type<int>());
- if (n != dummy_command) {
- entries++;
- hasher->update(n); // running hash (values and snapshots)
- tlogger.debug("{}: apply_changes {}", id, n);
- }
- }
- return entries;
-};
-
-// Wait for leader log to propagate to follower
-future<> raft_cluster::wait_log(size_t follower) {
- if ((*_connected)(to_raft_id(_leader), to_raft_id(follower)) &&
- _in_configuration.contains(_leader) && _in_configuration.contains(follower)) {
- auto leader_log_idx_term = _servers[_leader].server->log_last_idx_term();
- co_await _servers[follower].server->wait_log_idx_term(leader_log_idx_term);
- }
-}
-
-// Wait for leader log to propagate to specified followers
-future<> raft_cluster::wait_log(::wait_log followers) {
- auto leader_log_idx_term = _servers[_leader].server->log_last_idx_term();
- for (auto s: followers.local_ids) {
- co_await _servers[s].server->wait_log_idx_term(leader_log_idx_term);
- }
-}
-
-// Wait for all connected followers to catch up
-future<> raft_cluster::wait_log_all() {
- auto leader_log_idx_term = _servers[_leader].server->log_last_idx_term();
- for (size_t s = 0; s < _servers.size(); ++s) {
- if (s != _leader && (*_connected)(to_raft_id(s), to_raft_id(_leader)) &&
- _in_configuration.contains(s)) {
- co_await _servers[s].server->wait_log_idx_term(leader_log_idx_term);
- }
- }
-}
-
-void raft_cluster::elapse_elections() {
- for (auto s: _in_configuration) {
- _servers[s].server->elapse_election();
- }
-}
-
-future<> raft_cluster::elect_new_leader(size_t new_leader) {
- BOOST_CHECK_MESSAGE(new_leader < _servers.size(),
- format("Wrong next leader value {}", new_leader));
-
- if (new_leader == _leader) {
- co_return;
- }
-
- // Prevote prevents dueling candidate from bumping up term
- // but in corner cases it needs a loop to retry.
- // With prevote we need our candidate to retry bumping term
- // and waiting log on every loop.
- if (_prevote) {
- bool both_connected = (*_connected)(to_raft_id(_leader), to_raft_id(new_leader));
- if (both_connected) {
- co_await wait_log(new_leader);
- }
-
- pause_tickers();
- // Leader could be already partially disconnected, save current connectivity state
- struct connected prev_disconnected = *_connected;
- // Disconnect current leader from everyone
- _connected->disconnect(to_raft_id(_leader));
- // Make move all nodes past election threshold, also making old leader follower
- elapse_elections();
-
- do {
- // Consume leader output messages since a stray append might make new leader step down
- co_await later(); // yield
- _servers[new_leader].server->wait_until_candidate();
-
- if (both_connected) {
- // Allow old leader to vote for new candidate while not looking alive to others
- // Re-connect old leader
- _connected->connect(to_raft_id(_leader));
- // Disconnect old leader from all nodes except new leader
- _connected->disconnect(to_raft_id(_leader), to_raft_id(new_leader));
- }
- co_await _servers[new_leader].server->wait_election_done();
-
- if (both_connected) {
- // Re-disconnect leader for next loop
- _connected->disconnect(to_raft_id(_leader));
- }
- } while (!_servers[new_leader].server->is_leader());
-
- // Restore connections to the original setting
- *_connected = prev_disconnected;
- restart_tickers();
- co_await wait_log_all();
-
- } else { // not prevote
-
- do {
- if ((*_connected)(to_raft_id(_leader), to_raft_id(new_leader))) {
- co_await wait_log(new_leader);
- }
-
- pause_tickers();
- // Leader could be already partially disconnected, save current connectivity state
- struct connected prev_disconnected = *_connected;
- // Disconnect current leader from everyone
- _connected->disconnect(to_raft_id(_leader));
- // Make move all nodes past election threshold, also making old leader follower
- elapse_elections();
- // Consume leader output messages since a stray append might make new leader step down
- co_await later(); // yield
- _servers[new_leader].server->wait_until_candidate();
- // Re-connect old leader
- _connected->connect(to_raft_id(_leader));
- // Disconnect old leader from all nodes except new leader
- _connected->disconnect(to_raft_id(_leader), to_raft_id(new_leader));
- restart_tickers();
- co_await _servers[new_leader].server->wait_election_done();
-
- // Restore connections to the original setting
- *_connected = prev_disconnected;
- } while (!_servers[new_leader].server->is_leader());
- }
-
- tlogger.debug("confirmed leader on {}", to_raft_id(new_leader));
- _leader = new_leader;
-}
-
-// Run a free election of nodes in configuration
-// NOTE: there should be enough nodes capable of participating
-future<> raft_cluster::free_election() {
- tlogger.debug("Running free election");
- elapse_elections();
- size_t node = 0;
- for (;;) {
- tick_all();
- co_await seastar::sleep(10us); // Wait for election rpc exchanges
- // find if we have a leader
- for (auto s: _in_configuration) {
- if (_servers[s].server->is_leader()) {
- tlogger.debug("New leader {}", s);
- _leader = s;
- co_return;
- }
- }
- }
-}
-
-future<> raft_cluster::change_configuration(set_config sc) {
- BOOST_CHECK_MESSAGE(sc.size() > 0, "Empty configuration change not supported");
- raft::server_address_set set;
- std::unordered_set<size_t> new_config;
- for (auto s: sc) {
- new_config.insert(s.node_idx);
- auto addr = to_server_address(s.node_idx);
- addr.can_vote = s.can_vote;
- set.insert(std::move(addr));
- BOOST_CHECK_MESSAGE(s.node_idx < _servers.size(),
- format("Configuration element {} past node limit {}", s.node_idx, _servers.size() - 1));
- }
- BOOST_CHECK_MESSAGE(new_config.contains(_leader) || sc.size() < (_servers.size()/2 + 1),
- "New configuration without old leader and below quorum size (no election)");
-
- if (!new_config.contains(_leader)) {
- // Wait log on all nodes in new config before change
- for (auto s: sc) {
- co_await wait_log(s.node_idx);
- }
- }
-
- // Start nodes in new configuration but not in current configuration (re-added)
- for (auto s: new_config) {
- if (!_in_configuration.contains(s)) {
- tlogger.debug("Starting node being re-added to configuration {}", s);
- co_await reset_server(s, initial_state{.log = {}});
- _tickers[s].rearm_periodic(tick_delta);
- }
- }
-
- tlogger.debug("Changing configuration on leader {}", _leader);
- co_await _servers[_leader].server->set_configuration(std::move(set));
-
- if (!new_config.contains(_leader)) {
- co_await free_election();
- }
-
- // Now we know joint configuration was applied
- // Add a dummy entry to confirm new configuration was committed
- try {
- co_await _servers[_leader].server->add_entry(create_command(dummy_command),
- raft::wait_type::committed);
- } catch (raft::not_a_leader& e) {
- // leader stepped down, implying config fully changed
- } catch (raft::commit_status_unknown& e) {}
-
- // Stop nodes no longer in configuration
- for (auto s: _in_configuration) {
- if (!new_config.contains(s)) {
- _tickers[s].cancel();
- co_await stop_server(s);
- }
- }
-
- _in_configuration = new_config;
-}
-
-future<> raft_cluster::check_rpc_config(::check_rpc_config cc) {
- auto as = address_set(cc.addrs);
- for (auto& node: cc.nodes) {
- BOOST_CHECK(node.id < _servers.size());
- co_await seastar::async([&] {
- CHECK_EVENTUALLY_EQUAL(_servers[node.id].rpc->known_peers(), as);
- });
- }
-}
-
-void raft_cluster::check_rpc_added(::check_rpc_added expected) const {
- for (auto node: expected.nodes) {
- BOOST_CHECK_MESSAGE(_servers[node.id].rpc->servers_added() == expected.expected,
- format("RPC added {} does not match expected {}",
- _servers[node.id].rpc->servers_added(), expected.expected));
- }
-}
-
-void raft_cluster::check_rpc_removed(::check_rpc_removed expected) const {
- for (auto node: expected.nodes) {
- BOOST_CHECK_MESSAGE(_servers[node.id].rpc->servers_removed() == expected.expected,
- format("RPC removed {} does not match expected {}",
- _servers[node.id].rpc->servers_removed(), expected.expected));
- }
-}
-
-void raft_cluster::rpc_reset_counters(::rpc_reset_counters nodes) {
- for (auto node: nodes) {
- _servers[node.id].rpc->reset_counters();
- }
-}
-
-future<> raft_cluster::reconfigure_all() {
- if (_in_configuration.size() < _servers.size()) {
- set_config sc;
- for (size_t s = 0; s < _servers.size(); ++s) {
- sc.push_back(s);
- }
- co_await change_configuration(std::move(sc));
- }
-}
-
-future<> raft_cluster::partition(::partition p) {
- std::unordered_set<size_t> partition_servers;
- std::optional<size_t> next_leader;
- for (auto s: p) {
- size_t id;
- if (std::holds_alternative<struct leader>(s)) {
- next_leader = std::get<struct leader>(s).id;
- id = *next_leader;
- } else {
- id = std::get<int>(s);
- }
- partition_servers.insert(id);
- }
- if (next_leader) {
- // Wait for log to propagate to next leader, before disconnections
- co_await wait_log(*next_leader);
- } else {
- // No leader specified, wait log for all connected servers, before disconnections
- for (auto s: partition_servers) {
- if (_in_configuration.contains(s)) {
- co_await wait_log(s);
- }
- }
- }
- pause_tickers();
- _connected->connect_all();
- // NOTE: connectivity is independent of configuration so it's for all servers
- for (size_t s = 0; s < _servers.size(); ++s) {
- if (partition_servers.find(s) == partition_servers.end()) {
- // Disconnect servers not in main partition
- _connected->disconnect(to_raft_id(s));
- }
- }
- if (next_leader) {
- // New leader specified, elect it
- co_await elect_new_leader(*next_leader);
- } else if (partition_servers.find(_leader) == partition_servers.end() && p.size() > 0) {
- // Old leader disconnected and not specified new, free election
- co_await free_election();
- }
- restart_tickers();
-}
-
-future<> raft_cluster::tick(::tick t) {
- for (uint64_t i = 0; i < t.ticks; i++) {
- for (auto&& s: _servers) {
- s.server->tick();
- }
- co_await later();
- }
-}
-
-future<> raft_cluster::stop(::stop server) {
- co_await stop_server(server.id);
-}
-
-future<> raft_cluster::reset(::reset server) {
- co_await reset_server(server.id, server.state);
-}
-
-void raft_cluster::disconnect(::disconnect nodes) {
- _connected->cut(to_raft_id(nodes.first), to_raft_id(nodes.second));
-}
-
-void raft_cluster::verify() {
- BOOST_TEST_MESSAGE("Verifying hashes match expected (snapshot and apply calls)");
- auto expected = hasher_int::hash_range(_apply_entries).finalize_uint64();
- for (auto i: _in_configuration) {
- auto digest = _servers[i].sm->hasher->finalize_uint64();
- BOOST_CHECK_MESSAGE(digest == expected,
- format("Digest doesn't match for server [{}]: {} != {}", i, digest, expected));
- }
-
- BOOST_TEST_MESSAGE("Verifying persisted snapshots");
- // TODO: check that snapshot is taken when it should be
- for (auto& s : (*_persisted_snapshots)) {
- auto& [snp, val] = s.second;
- auto digest = val.hasher.finalize_uint64();
- auto expected = hasher_int::hash_range(val.idx).finalize_uint64();
- BOOST_CHECK_MESSAGE(digest == expected,
- format("Persisted snapshot {} doesn't match {} != {}", snp.id, digest, expected));
- }
-}
-
-std::vector<initial_state> raft_cluster::get_states(test_case test, bool prevote) {
- std::vector<initial_state> states(test.nodes); // Server initial states
-
- size_t leader = test.initial_leader;
-
- states[leader].term = raft::term_t{test.initial_term};
-
- // Server initial logs, etc
- for (size_t i = 0; i < states.size(); ++i) {
- size_t start_idx = 1;
- if (i < test.initial_snapshots.size()) {
- states[i].snapshot = test.initial_snapshots[i].snap;
- states[i].snp_value.hasher = hasher_int::hash_range(test.initial_snapshots[i].snap.idx);
- states[i].snp_value.idx = test.initial_snapshots[i].snap.idx;
- start_idx = states[i].snapshot.idx + 1;
- }
- if (i < test.initial_states.size()) {
- auto state = test.initial_states[i];
- states[i].log = create_log(state.le, start_idx);
- } else {
- states[i].log = {};
- }
- if (i < test.config.size()) {
- states[i].server_config = test.config[i];
- } else {
- states[i].server_config = { .enable_prevoting = prevote };
- }
- }
- return states;
-}
-
-future<> run_test(test_case test, bool prevote, bool packet_drops) {
-
- raft_cluster rafts(test, apply_changes, test.total_values,
- test.get_first_val(), test.initial_leader, prevote, packet_drops);
- co_await rafts.start_all();
-
- BOOST_TEST_MESSAGE("Processing updates");
-
- // Process all updates in order
- for (auto update: test.updates) {
- co_await std::visit(make_visitor(
- [&rafts] (entries update) -> future<> {
- co_await rafts.add_entries(update.n);
- },
- [&rafts] (new_leader update) -> future<> {
- co_await rafts.elect_new_leader(update.id);
- },
- [&rafts] (disconnect update) -> future<> {
- rafts.disconnect(update);
- co_return;
- },
- [&rafts] (partition update) -> future<> {
- co_await rafts.partition(update);
- },
- [&rafts] (stop update) -> future<> {
- co_await rafts.stop(update);
- },
- [&rafts] (reset update) -> future<> {
- co_await rafts.reset(update);
- },
- [&rafts] (wait_log update) -> future<> {
- co_await rafts.wait_log(update);
- },
- [&rafts] (set_config update) -> future<> {
- co_await rafts.change_configuration(update);
- },
- [&rafts] (check_rpc_config update) -> future<> {
- co_await rafts.check_rpc_config(update);
- },
- [&rafts] (check_rpc_added update) -> future<> {
- rafts.check_rpc_added(update);
- co_return;
- },
- [&rafts] (check_rpc_removed update) -> future<> {
- rafts.check_rpc_removed(update);
- co_return;
- },
- [&rafts] (rpc_reset_counters update) -> future<> {
- rafts.rpc_reset_counters(update);
- co_return;
- },
- [&rafts] (tick update) -> future<> {
- co_await rafts.tick(update);
- }
- ), std::move(update));
- }
-
- // Reconnect and bring all nodes back into configuration, if needed
- rafts.connect_all();
- co_await rafts.reconfigure_all();
-
- if (test.total_values > 0) {
- BOOST_TEST_MESSAGE("Appending remaining values");
- co_await rafts.add_remaining_entries();
- co_await rafts.wait_all();
- }
-
- co_await rafts.stop_all();
-
- if (test.total_values > 0) {
- rafts.verify();
- }
-}
-
-void replication_test(struct test_case test, bool prevote, bool packet_drops) {
- run_test(std::move(test), prevote, packet_drops).get();
-}

#define RAFT_TEST_CASE(test_name, test_body) \
SEASTAR_THREAD_TEST_CASE(test_name) { \
--
2.31.1

Alejo Sanchez

unread,
Jul 16, 2021, 11:58:55 AMJul 16
to scylla...@googlegroups.com, kos...@scylladb.com, gl...@scylladb.com, tgra...@scylladb.com, Alejo Sanchez
Store tick delta inside raft_cluster.

Signed-off-by: Alejo Sanchez <alejo....@scylladb.com>
---
test/raft/replication.hh | 27 +++++++++++++++------------
test/raft/replication_test.cc | 17 ++++++++++-------
2 files changed, 25 insertions(+), 19 deletions(-)

diff --git a/test/raft/replication.hh b/test/raft/replication.hh
index 902c990880..3355d68181 100644
--- a/test/raft/replication.hh
+++ b/test/raft/replication.hh
@@ -77,8 +77,6 @@ using namespace std::placeholders;

static seastar::logger tlogger("test");

-lowres_clock::duration tick_delta = 1ms;
-
auto dummy_command = std::numeric_limits<int>::min();

class hasher_int : public xx_hasher {
@@ -303,11 +301,12 @@ class raft_cluster {
std::vector<seastar::timer<lowres_clock>> _tickers;
size_t _leader;
std::vector<initial_state> get_states(test_case test, bool prevote);
+ lowres_clock::duration _tick_delta;
public:
raft_cluster(test_case test,
apply_fn apply,
size_t apply_entries, size_t first_val, size_t first_leader,
- bool prevote, bool packet_drops);
+ bool prevote, bool packet_drops, lowres_clock::duration tick_delta);
// No copy
raft_cluster(const raft_cluster&) = delete;
raft_cluster(raft_cluster&&) = default;
@@ -640,7 +639,7 @@ raft_cluster::test_server raft_cluster::create_server(size_t id, initial_state s
raft_cluster::raft_cluster(test_case test,
apply_fn apply,
size_t apply_entries, size_t first_val, size_t first_leader,
- bool prevote, bool packet_drops) :
+ bool prevote, bool packet_drops, lowres_clock::duration tick_delta) :
_connected(std::make_unique<struct connected>(test.nodes)),
_snapshots(std::make_unique<snapshots>()),
_persisted_snapshots(std::make_unique<persisted_snapshots>()),
@@ -649,7 +648,8 @@ raft_cluster::raft_cluster(test_case test,
_packet_drops(packet_drops),
_prevote(prevote),
_apply(apply),
- _leader(first_leader) {
+ _leader(first_leader),
+ _tick_delta(tick_delta) {

rpc::reset_network();

@@ -750,7 +750,7 @@ void raft_cluster::init_raft_tickers() {
_tickers.resize(_servers.size());
// Only start tickers for servers in configuration
for (auto s: _in_configuration) {
- _tickers[s].arm_periodic(tick_delta);
+ _tickers[s].arm_periodic(_tick_delta);
_tickers[s].set_callback([&, s] {
_servers[s].server->tick();
});
@@ -765,7 +765,7 @@ void raft_cluster::pause_tickers() {

void raft_cluster::restart_tickers() {
for (auto s: _in_configuration) {
- _tickers[s].rearm_periodic(tick_delta);
+ _tickers[s].rearm_periodic(_tick_delta);
}
}

@@ -978,7 +978,7 @@ future<> raft_cluster::change_configuration(set_config sc) {
if (!_in_configuration.contains(s)) {
tlogger.debug("Starting node being re-added to configuration {}", s);
co_await reset_server(s, initial_state{.log = {}});
- _tickers[s].rearm_periodic(tick_delta);
+ _tickers[s].rearm_periodic(_tick_delta);
}
}

@@ -1166,10 +1166,12 @@ std::vector<initial_state> raft_cluster::get_states(test_case test, bool prevote
return states;
}

-future<> run_test(test_case test, bool prevote, bool packet_drops) {
+future<> run_test(test_case test, bool prevote, bool packet_drops,
+ lowres_clock::duration tick_delta) {

raft_cluster rafts(test, apply_changes, test.total_values,
- test.get_first_val(), test.initial_leader, prevote, packet_drops);
+ test.get_first_val(), test.initial_leader, prevote, packet_drops,
+ tick_delta);
co_await rafts.start_all();

BOOST_TEST_MESSAGE("Processing updates");
@@ -1240,6 +1242,7 @@ future<> run_test(test_case test, bool prevote, bool packet_drops) {
}
}

-void replication_test(struct test_case test, bool prevote, bool packet_drops) {
- run_test(std::move(test), prevote, packet_drops).get();
+void replication_test(struct test_case test, bool prevote, bool packet_drops,
+ lowres_clock::duration tick_delta) {
+ run_test(std::move(test), prevote, packet_drops, tick_delta).get();
}
diff --git a/test/raft/replication_test.cc b/test/raft/replication_test.cc
index 4bdbb133ce..ad6d7b1445 100644
--- a/test/raft/replication_test.cc
+++ b/test/raft/replication_test.cc
@@ -23,15 +23,18 @@

// Test Raft library with declarative test definitions

+
+lowres_clock::duration tick_delta = 1ms;
+
#define RAFT_TEST_CASE(test_name, test_body) \
SEASTAR_THREAD_TEST_CASE(test_name) { \
- replication_test(test_body, false, false); } \
+ replication_test(test_body, false, false, tick_delta); } \
SEASTAR_THREAD_TEST_CASE(test_name ## _drops) { \
- replication_test(test_body, false, true); } \
+ replication_test(test_body, false, true, tick_delta); } \
SEASTAR_THREAD_TEST_CASE(test_name ## _prevote) { \
- replication_test(test_body, true, false); } \
+ replication_test(test_body, true, false, tick_delta); } \
SEASTAR_THREAD_TEST_CASE(test_name ## _prevote_drops) { \
- replication_test(test_body, true, true); }
+ replication_test(test_body, true, true, tick_delta); }

// 1 nodes, simple replication, empty, no updates
RAFT_TEST_CASE(simple_replication, (test_case{
@@ -203,7 +206,7 @@ SEASTAR_THREAD_TEST_CASE(test_take_snapshot_and_stream) {
{.nodes = 3,
.config = {{.snapshot_threshold = 10, .snapshot_trailing = 5}},
.updates = {entries{5}, partition{0,1}, entries{10}, partition{0, 2}, entries{20}}}
- , false, false);
+ , false, false, tick_delta);
}

// Check removing all followers, add entry, bring back one follower and make it leader
@@ -228,7 +231,7 @@ SEASTAR_THREAD_TEST_CASE(remove_node_cycle) {
// TODO: find out why it breaks in release mode
// set_config{3,0,1}, entries{2}, new_leader{0}
}}
- , false, false);
+ , false, false, tick_delta);
}

SEASTAR_THREAD_TEST_CASE(test_leader_change_during_snapshot_transfere) {
@@ -241,7 +244,7 @@ SEASTAR_THREAD_TEST_CASE(test_leader_change_during_snapshot_transfere) {
.term = raft::term_t(1),
.id = delay_apply_snapshot}}},
.updates = {tick{10} /* ticking starts snapshot transfer */, new_leader{1}, entries{10}}}
- , false, false);
+ , false, false, tick_delta);
}

// verifies that each node in a cluster can campaign
--
2.31.1

Alejo Sanchez

unread,
Jul 16, 2021, 11:58:57 AMJul 16
to scylla...@googlegroups.com, kos...@scylladb.com, gl...@scylladb.com, tgra...@scylladb.com, Alejo Sanchez
Templetyze clock type.

Use a struct for run_test to work around
https://bugs.llvm.org/show_bug.cgi?id=50345

With help from @kbr-

Signed-off-by: Alejo Sanchez <alejo....@scylladb.com>
---
test/raft/replication.hh | 283 ++++++++++++++++++++--------------
test/raft/replication_test.cc | 15 +-
2 files changed, 173 insertions(+), 125 deletions(-)

diff --git a/test/raft/replication.hh b/test/raft/replication.hh
index 3355d68181..35f647f4e6 100644
--- a/test/raft/replication.hh
+++ b/test/raft/replication.hh
@@ -276,6 +276,7 @@ raft::snapshot_id delay_apply_snapshot{utils::UUID(0, 0xdeadbeaf)};
// sending of a snaphot with that id will be delayed until snapshot_sync is signaled
raft::snapshot_id delay_send_snapshot{utils::UUID(0xdeadbeaf, 0)};

+template <typename Clock>
class raft_cluster {
using apply_fn = std::function<size_t(raft::server_id id, const std::vector<raft::command_cref>& commands, lw_shared_ptr<hasher_int> hasher)>;
class state_machine;
@@ -298,15 +299,16 @@ class raft_cluster {
bool _prevote;
apply_fn _apply;
std::unordered_set<size_t> _in_configuration; // Servers in current configuration
- std::vector<seastar::timer<lowres_clock>> _tickers;
+ std::vector<seastar::timer<Clock>> _tickers;
size_t _leader;
std::vector<initial_state> get_states(test_case test, bool prevote);
- lowres_clock::duration _tick_delta;
+ typename Clock::duration _tick_delta;
public:
raft_cluster(test_case test,
apply_fn apply,
size_t apply_entries, size_t first_val, size_t first_leader,
- bool prevote, bool packet_drops, lowres_clock::duration tick_delta);
+ bool prevote, bool packet_drops,
+ typename Clock::duration tick_delta);
// No copy
raft_cluster(const raft_cluster&) = delete;
raft_cluster(raft_cluster&&) = default;
@@ -350,7 +352,8 @@ class raft_cluster {
test_server create_server(size_t id, initial_state state);
};

-class raft_cluster::state_machine : public raft::state_machine {
+template <typename Clock>
+class raft_cluster<Clock>::state_machine : public raft::state_machine {
raft::server_id _id;
apply_fn _apply;
size_t _apply_entries;
@@ -402,7 +405,8 @@ class raft_cluster::state_machine : public raft::state_machine {
}
};

-class raft_cluster::persistence : public raft::persistence {
+template <typename Clock>
+class raft_cluster<Clock>::persistence : public raft::persistence {
raft::server_id _id;
initial_state _conf;
snapshots* _snapshots;
@@ -438,7 +442,8 @@ class raft_cluster::persistence : public raft::persistence {
virtual future<> abort() { return make_ready_future<>(); }
};

-struct raft_cluster::connected {
+template <typename Clock>
+struct raft_cluster<Clock>::connected {
struct connection {
raft::server_id from;
raft::server_id to;
@@ -492,7 +497,8 @@ struct raft_cluster::connected {
}
};

-class raft_cluster::failure_detector : public raft::failure_detector {
+template <typename Clock>
+class raft_cluster<Clock>::failure_detector : public raft::failure_detector {
raft::server_id _id;
connected* _connected;
public:
@@ -502,7 +508,8 @@ class raft_cluster::failure_detector : public raft::failure_detector {
}
};

-class raft_cluster::rpc : public raft::rpc {
+template <typename Clock>
+class raft_cluster<Clock>::rpc : public raft::rpc {
static std::unordered_map<raft::server_id, rpc*> net;
raft::server_id _id;
connected* _connected;
@@ -610,9 +617,11 @@ class raft_cluster::rpc : public raft::rpc {
}
};

-std::unordered_map<raft::server_id, raft_cluster::rpc*> raft_cluster::rpc::net;
+template <typename Clock>
+std::unordered_map<raft::server_id, typename raft_cluster<Clock>::rpc*> raft_cluster<Clock>::rpc::net;

-raft_cluster::test_server raft_cluster::create_server(size_t id, initial_state state) {
+template <typename Clock>
+typename raft_cluster<Clock>::test_server raft_cluster<Clock>::create_server(size_t id, initial_state state) {

auto uuid = to_raft_id(id);
auto sm = std::make_unique<state_machine>(uuid, _apply, _apply_entries, _snapshots.get());
@@ -636,10 +645,12 @@ raft_cluster::test_server raft_cluster::create_server(size_t id, initial_state s
};
}

-raft_cluster::raft_cluster(test_case test,
+template <typename Clock>
+raft_cluster<Clock>::raft_cluster(test_case test,
apply_fn apply,
size_t apply_entries, size_t first_val, size_t first_leader,
- bool prevote, bool packet_drops, lowres_clock::duration tick_delta) :
+ bool prevote, bool packet_drops,
+ typename Clock::duration tick_delta) :
_connected(std::make_unique<struct connected>(test.nodes)),
_snapshots(std::make_unique<snapshots>()),
_persisted_snapshots(std::make_unique<persisted_snapshots>()),
@@ -673,7 +684,8 @@ raft_cluster::raft_cluster(test_case test,
}
}

-future<> raft_cluster::stop_server(size_t id) {
+template <typename Clock>
+future<> raft_cluster<Clock>::stop_server(size_t id) {
cancel_ticker(id);
co_await _servers[id].server->abort();
_snapshots->erase(to_raft_id(id));
@@ -681,13 +693,15 @@ future<> raft_cluster::stop_server(size_t id) {
}

// Reset previously stopped server
-future<> raft_cluster::reset_server(size_t id, initial_state state) {
+template <typename Clock>
+future<> raft_cluster<Clock>::reset_server(size_t id, initial_state state) {
_servers[id] = create_server(id, state);
co_await _servers[id].server->start();
set_ticker_callback(id);
}

-future<> raft_cluster::start_all() {
+template <typename Clock>
+future<> raft_cluster<Clock>::start_all() {
co_await parallel_for_each(_servers, [] (auto& r) {
return r.server->start();
});
@@ -697,34 +711,40 @@ future<> raft_cluster::start_all() {
co_await _servers[_leader].server->wait_election_done();
}

-future<> raft_cluster::stop_all() {
+template <typename Clock>
+future<> raft_cluster<Clock>::stop_all() {
for (auto s: _in_configuration) {
co_await stop_server(s);
};
}

-future<> raft_cluster::wait_all() {
+template <typename Clock>
+future<> raft_cluster<Clock>::wait_all() {
for (auto s: _in_configuration) {
co_await _servers[s].sm->done();
}
}

-void raft_cluster::tick_all() {
+template <typename Clock>
+void raft_cluster<Clock>::tick_all() {
for (auto s: _in_configuration) {
_servers[s].server->tick();
}
}

-void raft_cluster::disconnect(size_t id, std::optional<raft::server_id> except) {
+template <typename Clock>
+void raft_cluster<Clock>::disconnect(size_t id, std::optional<raft::server_id> except) {
_connected->disconnect(to_raft_id(id), except);
}

-void raft_cluster::connect_all() {
+template <typename Clock>
+void raft_cluster<Clock>::connect_all() {
_connected->connect_all();
}

// Add consecutive integer entries to a leader
-future<> raft_cluster::add_entries(size_t n) {
+template <typename Clock>
+future<> raft_cluster<Clock>::add_entries(size_t n) {
size_t end = _next_val + n;
while (_next_val != end) {
try {
@@ -742,11 +762,13 @@ future<> raft_cluster::add_entries(size_t n) {
}
}

-future<> raft_cluster::add_remaining_entries() {
+template <typename Clock>
+future<> raft_cluster<Clock>::add_remaining_entries() {
co_await add_entries(_apply_entries - _next_val);
}

-void raft_cluster::init_raft_tickers() {
+template <typename Clock>
+void raft_cluster<Clock>::init_raft_tickers() {
_tickers.resize(_servers.size());
// Only start tickers for servers in configuration
for (auto s: _in_configuration) {
@@ -757,23 +779,27 @@ void raft_cluster::init_raft_tickers() {
}
}

-void raft_cluster::pause_tickers() {
+template <typename Clock>
+void raft_cluster<Clock>::pause_tickers() {
for (auto s: _in_configuration) {
_tickers[s].cancel();
}
}

-void raft_cluster::restart_tickers() {
+template <typename Clock>
+void raft_cluster<Clock>::restart_tickers() {
for (auto s: _in_configuration) {
_tickers[s].rearm_periodic(_tick_delta);
}
}

-void raft_cluster::cancel_ticker(size_t id) {
+template <typename Clock>
+void raft_cluster<Clock>::cancel_ticker(size_t id) {
_tickers[id].cancel();
}

-void raft_cluster::set_ticker_callback(size_t id) noexcept {
+template <typename Clock>
+void raft_cluster<Clock>::set_ticker_callback(size_t id) noexcept {
_tickers[id].set_callback([&, id] {
_servers[id].server->tick();
});
@@ -814,7 +840,8 @@ size_t apply_changes(raft::server_id id, const std::vector<raft::command_cref>&
};

// Wait for leader log to propagate to follower
-future<> raft_cluster::wait_log(size_t follower) {
+template <typename Clock>
+future<> raft_cluster<Clock>::wait_log(size_t follower) {
if ((*_connected)(to_raft_id(_leader), to_raft_id(follower)) &&
_in_configuration.contains(_leader) && _in_configuration.contains(follower)) {
auto leader_log_idx_term = _servers[_leader].server->log_last_idx_term();
@@ -823,7 +850,8 @@ future<> raft_cluster::wait_log(size_t follower) {
}

// Wait for leader log to propagate to specified followers
-future<> raft_cluster::wait_log(::wait_log followers) {
+template <typename Clock>
+future<> raft_cluster<Clock>::wait_log(::wait_log followers) {
auto leader_log_idx_term = _servers[_leader].server->log_last_idx_term();
for (auto s: followers.local_ids) {
co_await _servers[s].server->wait_log_idx_term(leader_log_idx_term);
@@ -831,7 +859,8 @@ future<> raft_cluster::wait_log(::wait_log followers) {
}

// Wait for all connected followers to catch up
-future<> raft_cluster::wait_log_all() {
+template <typename Clock>
+future<> raft_cluster<Clock>::wait_log_all() {
auto leader_log_idx_term = _servers[_leader].server->log_last_idx_term();
for (size_t s = 0; s < _servers.size(); ++s) {
if (s != _leader && (*_connected)(to_raft_id(s), to_raft_id(_leader)) &&
@@ -841,13 +870,15 @@ future<> raft_cluster::wait_log_all() {
}
}

-void raft_cluster::elapse_elections() {
+template <typename Clock>
+void raft_cluster<Clock>::elapse_elections() {
for (auto s: _in_configuration) {
_servers[s].server->elapse_election();
}
}

-future<> raft_cluster::elect_new_leader(size_t new_leader) {
+template <typename Clock>
+future<> raft_cluster<Clock>::elect_new_leader(size_t new_leader) {
BOOST_CHECK_MESSAGE(new_leader < _servers.size(),
format("Wrong next leader value {}", new_leader));

@@ -933,7 +964,8 @@ future<> raft_cluster::elect_new_leader(size_t new_leader) {

// Run a free election of nodes in configuration
// NOTE: there should be enough nodes capable of participating
-future<> raft_cluster::free_election() {
+template <typename Clock>
+future<> raft_cluster<Clock>::free_election() {
tlogger.debug("Running free election");
elapse_elections();
size_t node = 0;
@@ -951,7 +983,8 @@ future<> raft_cluster::free_election() {
}
}

-future<> raft_cluster::change_configuration(set_config sc) {
+template <typename Clock>
+future<> raft_cluster<Clock>::change_configuration(set_config sc) {
BOOST_CHECK_MESSAGE(sc.size() > 0, "Empty configuration change not supported");
raft::server_address_set set;
std::unordered_set<size_t> new_config;
@@ -1009,7 +1042,8 @@ future<> raft_cluster::change_configuration(set_config sc) {
_in_configuration = new_config;
}

-future<> raft_cluster::check_rpc_config(::check_rpc_config cc) {
+template <typename Clock>
+future<> raft_cluster<Clock>::check_rpc_config(::check_rpc_config cc) {
auto as = address_set(cc.addrs);
for (auto& node: cc.nodes) {
BOOST_CHECK(node.id < _servers.size());
@@ -1019,7 +1053,8 @@ future<> raft_cluster::check_rpc_config(::check_rpc_config cc) {
}
}

-void raft_cluster::check_rpc_added(::check_rpc_added expected) const {
+template <typename Clock>
+void raft_cluster<Clock>::check_rpc_added(::check_rpc_added expected) const {
for (auto node: expected.nodes) {
BOOST_CHECK_MESSAGE(_servers[node.id].rpc->servers_added() == expected.expected,
format("RPC added {} does not match expected {}",
@@ -1027,7 +1062,8 @@ void raft_cluster::check_rpc_added(::check_rpc_added expected) const {
}
}

-void raft_cluster::check_rpc_removed(::check_rpc_removed expected) const {
+template <typename Clock>
+void raft_cluster<Clock>::check_rpc_removed(::check_rpc_removed expected) const {
for (auto node: expected.nodes) {
BOOST_CHECK_MESSAGE(_servers[node.id].rpc->servers_removed() == expected.expected,
format("RPC removed {} does not match expected {}",
@@ -1035,13 +1071,15 @@ void raft_cluster::check_rpc_removed(::check_rpc_removed expected) const {
}
}

-void raft_cluster::rpc_reset_counters(::rpc_reset_counters nodes) {
+template <typename Clock>
+void raft_cluster<Clock>::rpc_reset_counters(::rpc_reset_counters nodes) {
for (auto node: nodes) {
_servers[node.id].rpc->reset_counters();
}
}

-future<> raft_cluster::reconfigure_all() {
+template <typename Clock>
+future<> raft_cluster<Clock>::reconfigure_all() {
if (_in_configuration.size() < _servers.size()) {
set_config sc;
for (size_t s = 0; s < _servers.size(); ++s) {
@@ -1051,7 +1089,8 @@ future<> raft_cluster::reconfigure_all() {
}
}

-future<> raft_cluster::partition(::partition p) {
+template <typename Clock>
+future<> raft_cluster<Clock>::partition(::partition p) {
std::unordered_set<size_t> partition_servers;
std::optional<size_t> next_leader;
for (auto s: p) {
@@ -1094,7 +1133,8 @@ future<> raft_cluster::partition(::partition p) {
restart_tickers();
}

-future<> raft_cluster::tick(::tick t) {
+template <typename Clock>
+future<> raft_cluster<Clock>::tick(::tick t) {
for (uint64_t i = 0; i < t.ticks; i++) {
for (auto&& s: _servers) {
s.server->tick();
@@ -1103,19 +1143,23 @@ future<> raft_cluster::tick(::tick t) {
}
}

-future<> raft_cluster::stop(::stop server) {
+template <typename Clock>
+future<> raft_cluster<Clock>::stop(::stop server) {
co_await stop_server(server.id);
}

-future<> raft_cluster::reset(::reset server) {
+template <typename Clock>
+future<> raft_cluster<Clock>::reset(::reset server) {
co_await reset_server(server.id, server.state);
}

-void raft_cluster::disconnect(::disconnect nodes) {
+template <typename Clock>
+void raft_cluster<Clock>::disconnect(::disconnect nodes) {
_connected->cut(to_raft_id(nodes.first), to_raft_id(nodes.second));
}

-void raft_cluster::verify() {
+template <typename Clock>
+void raft_cluster<Clock>::verify() {
BOOST_TEST_MESSAGE("Verifying hashes match expected (snapshot and apply calls)");
auto expected = hasher_int::hash_range(_apply_entries).finalize_uint64();
for (auto i: _in_configuration) {
@@ -1135,7 +1179,8 @@ void raft_cluster::verify() {
}
}

-std::vector<initial_state> raft_cluster::get_states(test_case test, bool prevote) {
+template <typename Clock>
+std::vector<initial_state> raft_cluster<Clock>::get_states(test_case test, bool prevote) {
std::vector<initial_state> states(test.nodes); // Server initial states

size_t leader = test.initial_leader;
@@ -1166,83 +1211,87 @@ std::vector<initial_state> raft_cluster::get_states(test_case test, bool prevote
return states;
}

-future<> run_test(test_case test, bool prevote, bool packet_drops,
- lowres_clock::duration tick_delta) {
-
- raft_cluster rafts(test, apply_changes, test.total_values,
- test.get_first_val(), test.initial_leader, prevote, packet_drops,
- tick_delta);
+template <typename Clock>
+struct run_test {
+ future<> operator() (test_case test, bool prevote, bool packet_drops,
+ typename Clock::duration tick_delta) {
+
+ raft_cluster<Clock> rafts(test, ::apply_changes, test.total_values,
+ test.get_first_val(), test.initial_leader, prevote, packet_drops,
+ tick_delta);
+ co_await rafts.start_all();
+
+ BOOST_TEST_MESSAGE("Processing updates");
+
+ // Process all updates in order
+ for (auto update: test.updates) {
+ co_await std::visit(make_visitor(
+ [&rafts] (entries update) -> future<> {
+ co_await rafts.add_entries(update.n);
+ },
+ [&rafts] (new_leader update) -> future<> {
+ co_await rafts.elect_new_leader(update.id);
+ },
+ [&rafts] (disconnect update) -> future<> {
+ rafts.disconnect(update);
+ co_return;
+ },
+ [&rafts] (partition update) -> future<> {
+ co_await rafts.partition(update);
+ },
+ [&rafts] (stop update) -> future<> {
+ co_await rafts.stop(update);
+ },
+ [&rafts] (reset update) -> future<> {
+ co_await rafts.reset(update);
+ },
+ [&rafts] (wait_log update) -> future<> {
+ co_await rafts.wait_log(update);
+ },
+ [&rafts] (set_config update) -> future<> {
+ co_await rafts.change_configuration(update);
+ },
+ [&rafts] (check_rpc_config update) -> future<> {
+ co_await rafts.check_rpc_config(update);
+ },
+ [&rafts] (check_rpc_added update) -> future<> {
+ rafts.check_rpc_added(update);
+ co_return;
+ },
+ [&rafts] (check_rpc_removed update) -> future<> {
+ rafts.check_rpc_removed(update);
+ co_return;
+ },
+ [&rafts] (rpc_reset_counters update) -> future<> {
+ rafts.rpc_reset_counters(update);
+ co_return;
+ },
+ [&rafts] (tick update) -> future<> {
+ co_await rafts.tick(update);
+ }
+ ), std::move(update));
}
- ), std::move(update));
- }

- // Reconnect and bring all nodes back into configuration, if needed
- rafts.connect_all();
- co_await rafts.reconfigure_all();
+ // Reconnect and bring all nodes back into configuration, if needed
+ rafts.connect_all();
+ co_await rafts.reconfigure_all();

- if (test.total_values > 0) {
- BOOST_TEST_MESSAGE("Appending remaining values");
- co_await rafts.add_remaining_entries();
- co_await rafts.wait_all();
- }
+ if (test.total_values > 0) {
+ BOOST_TEST_MESSAGE("Appending remaining values");
+ co_await rafts.add_remaining_entries();
+ co_await rafts.wait_all();
+ }

- co_await rafts.stop_all();
+ co_await rafts.stop_all();

- if (test.total_values > 0) {
- rafts.verify();
+ if (test.total_values > 0) {
+ rafts.verify();
+ }
}
-}
+};

+template <typename Clock>
void replication_test(struct test_case test, bool prevote, bool packet_drops,
- lowres_clock::duration tick_delta) {
- run_test(std::move(test), prevote, packet_drops, tick_delta).get();
+ typename Clock::duration tick_delta) {
+ run_test<Clock>{}(std::move(test), prevote, packet_drops, tick_delta).get();
}
diff --git a/test/raft/replication_test.cc b/test/raft/replication_test.cc
index ad6d7b1445..b407ff9ad0 100644
--- a/test/raft/replication_test.cc
+++ b/test/raft/replication_test.cc
@@ -28,13 +28,13 @@ lowres_clock::duration tick_delta = 1ms;

#define RAFT_TEST_CASE(test_name, test_body) \
SEASTAR_THREAD_TEST_CASE(test_name) { \
- replication_test(test_body, false, false, tick_delta); } \
+ replication_test<lowres_clock>(test_body, false, false, tick_delta); } \
SEASTAR_THREAD_TEST_CASE(test_name ## _drops) { \
- replication_test(test_body, false, true, tick_delta); } \
+ replication_test<lowres_clock>(test_body, false, true, tick_delta); } \
SEASTAR_THREAD_TEST_CASE(test_name ## _prevote) { \
- replication_test(test_body, true, false, tick_delta); } \
+ replication_test<lowres_clock>(test_body, true, false, tick_delta); } \
SEASTAR_THREAD_TEST_CASE(test_name ## _prevote_drops) { \
- replication_test(test_body, true, true, tick_delta); }
+ replication_test<lowres_clock>(test_body, true, true, tick_delta); }

// 1 nodes, simple replication, empty, no updates
RAFT_TEST_CASE(simple_replication, (test_case{
@@ -201,7 +201,7 @@ RAFT_TEST_CASE(drops_04_dueling_repro, (test_case{

// TODO: change to RAFT_TEST_CASE once it's stable for handling packet drops
SEASTAR_THREAD_TEST_CASE(test_take_snapshot_and_stream) {
- replication_test(
+ replication_test<lowres_clock>(
// Snapshot automatic take and load
{.nodes = 3,
.config = {{.snapshot_threshold = 10, .snapshot_trailing = 5}},
@@ -223,7 +223,7 @@ RAFT_TEST_CASE(conf_changes_2, (test_case{

// Check removing a node from configuration, adding entries; cycle for all combinations
SEASTAR_THREAD_TEST_CASE(remove_node_cycle) {
- replication_test(
+ replication_test<lowres_clock>(
{.nodes = 4,
.updates = {set_config{0,1,2}, entries{2}, new_leader{1},
set_config{1,2,3}, entries{2}, new_leader{2},
@@ -235,7 +235,7 @@ SEASTAR_THREAD_TEST_CASE(remove_node_cycle) {
}

SEASTAR_THREAD_TEST_CASE(test_leader_change_during_snapshot_transfere) {
- replication_test(
+ replication_test<lowres_clock>(
{.nodes = 3,
.initial_snapshots = {{.snap = {.idx = raft::index_t(10),
.term = raft::term_t(1),
@@ -547,4 +547,3 @@ RAFT_TEST_CASE(rpc_configuration_truncate_restore_from_log, (test_case{
check_rpc_config{{node_id{0},node_id{1},node_id{2}},
rpc_address_set{node_id{0},node_id{1},node_id{2}}},
}}));
-
--
2.31.1

Alejo Sanchez

unread,
Jul 16, 2021, 11:58:58 AMJul 16
to scylla...@googlegroups.com, kos...@scylladb.com, gl...@scylladb.com, tgra...@scylladb.com, Alejo Sanchez
Support disconnection of a pair of nodes or one with the rest.

Signed-off-by: Alejo Sanchez <alejo....@scylladb.com>
---
test/raft/replication.hh | 34 +++++++++++++++++++++++++++-------
1 file changed, 27 insertions(+), 7 deletions(-)

diff --git a/test/raft/replication.hh b/test/raft/replication.hh
index 35f647f4e6..d3645d31e1 100644
--- a/test/raft/replication.hh
+++ b/test/raft/replication.hh
@@ -142,12 +142,18 @@ struct leader {
};
using partition = std::vector<std::variant<leader,int>>;

+
+// Disconnect a node from the rest
+struct disconnect1 {
+ size_t id;
+};
+
// Disconnect 2 servers both ways
struct two_nodes {
size_t first;
size_t second;
};
-struct disconnect : public two_nodes {};
+struct disconnect2 : public two_nodes {};

struct stop {
size_t id;
@@ -213,9 +219,9 @@ struct tick {
uint64_t ticks;
};

-using update = std::variant<entries, new_leader, partition, disconnect, stop, reset, wait_log,
- set_config, check_rpc_config, check_rpc_added, check_rpc_removed, rpc_reset_counters,
- tick>;
+using update = std::variant<entries, new_leader, partition, disconnect1, disconnect2,
+ stop, reset, wait_log, set_config, check_rpc_config, check_rpc_added,
+ check_rpc_removed, rpc_reset_counters, tick>;

struct log_entry {
unsigned term;
@@ -346,7 +352,8 @@ class raft_cluster {
future<> tick(::tick t);
future<> stop(::stop server);
future<> reset(::reset server);
- void disconnect(::disconnect nodes);
+ void disconnect(::disconnect2 nodes);
+ future<> disconnect(::disconnect1 nodes);
void verify();
private:
test_server create_server(size_t id, initial_state state);
@@ -1154,10 +1161,20 @@ future<> raft_cluster<Clock>::reset(::reset server) {
}

template <typename Clock>
-void raft_cluster<Clock>::disconnect(::disconnect nodes) {
+void raft_cluster<Clock>::disconnect(::disconnect2 nodes) {
_connected->cut(to_raft_id(nodes.first), to_raft_id(nodes.second));
}

+template <typename Clock>
+future<> raft_cluster<Clock>::disconnect(::disconnect1 node) {
+ _connected->disconnect(to_raft_id(node.id));
+ if (node.id == _leader) {
+ _servers[_leader].server->elapse_election(); // make old leader step down
+ co_await free_election();
+ }
+ co_return;
+}
+
template <typename Clock>
void raft_cluster<Clock>::verify() {
BOOST_TEST_MESSAGE("Verifying hashes match expected (snapshot and apply calls)");
@@ -1232,10 +1249,13 @@ struct run_test {
[&rafts] (new_leader update) -> future<> {
co_await rafts.elect_new_leader(update.id);
},
- [&rafts] (disconnect update) -> future<> {
+ [&rafts] (disconnect2 update) -> future<> {
rafts.disconnect(update);
co_return;
},
+ [&rafts] (::disconnect1 update) -> future<> {
+ co_await rafts.disconnect(update);
+ },
[&rafts] (partition update) -> future<> {
co_await rafts.partition(update);
},
--
2.31.1

Alejo Sanchez

unread,
Jul 16, 2021, 11:59:00 AMJul 16
to scylla...@googlegroups.com, kos...@scylladb.com, gl...@scylladb.com, tgra...@scylladb.com, Alejo Sanchez
Allow specifying ranges within partition to handle large number of
nodes.

Signed-off-by: Alejo Sanchez <alejo....@scylladb.com>
---
test/raft/replication.hh | 18 ++++++++++++++----
1 file changed, 14 insertions(+), 4 deletions(-)

diff --git a/test/raft/replication.hh b/test/raft/replication.hh
index d3645d31e1..6a48c8de2b 100644
--- a/test/raft/replication.hh
+++ b/test/raft/replication.hh
@@ -140,7 +140,12 @@ struct new_leader {
struct leader {
size_t id;
};
-using partition = std::vector<std::variant<leader,int>>;
+// Inclusive range
+struct range {
+ size_t start;
+ size_t end;
+};
+using partition = std::vector<std::variant<leader,range,int>>;


// Disconnect a node from the rest
@@ -1104,11 +1109,16 @@ future<> raft_cluster<Clock>::partition(::partition p) {
size_t id;
if (std::holds_alternative<struct leader>(s)) {
next_leader = std::get<struct leader>(s).id;
- id = *next_leader;
+ partition_servers.insert(*next_leader);
+ } else if (std::holds_alternative<struct range>(s)) {
+ auto range = std::get<struct range>(s);
+ for (size_t id = range.start; id <= range.end; id++) {
+ assert(id < _servers.size());
+ partition_servers.insert(id);
+ }
} else {
- id = std::get<int>(s);
+ partition_servers.insert(std::get<int>(s));
}
- partition_servers.insert(id);
}
if (next_leader) {
// Wait for log to propagate to next leader, before disconnections
--
2.31.1

Alejo Sanchez

unread,
Jul 16, 2021, 11:59:01 AMJul 16
to scylla...@googlegroups.com, kos...@scylladb.com, gl...@scylladb.com, tgra...@scylladb.com, Alejo Sanchez
Test with many nodes and realistic timers and ticks.

Signed-off-by: Alejo Sanchez <alejo....@scylladb.com>
---
configure.py | 2 ++
test/raft/many_test.cc | 48 ++++++++++++++++++++++++++++++++++++++++++
2 files changed, 50 insertions(+)
create mode 100644 test/raft/many_test.cc

diff --git a/configure.py b/configure.py
index 4bd29434d1..b8ece9b7a9 100755
--- a/configure.py
+++ b/configure.py
@@ -553,6 +553,7 @@ perf_tests = set([
raft_tests = set([
'test/raft/replication_test',
'test/raft/randomized_nemesis_test',
+ 'test/raft/many_test',
'test/raft/fsm_test',
'test/raft/etcd_test',
'test/raft/raft_sys_table_storage_test',
@@ -1255,6 +1256,7 @@ deps['test/boost/alternator_unit_test'] += ['alternator/base64.cc']

deps['test/raft/replication_test'] = ['test/raft/replication_test.cc', 'test/raft/helpers.cc'] + scylla_minimal_raft_dependencies
deps['test/raft/randomized_nemesis_test'] = ['test/raft/randomized_nemesis_test.cc'] + scylla_minimal_raft_dependencies
+deps['test/raft/many_test'] = ['test/raft/many_test.cc', 'test/raft/helpers.cc'] + scylla_raft_dependencies
deps['test/raft/fsm_test'] = ['test/raft/fsm_test.cc', 'test/raft/helpers.cc', 'test/lib/log.cc'] + scylla_minimal_raft_dependencies
deps['test/raft/etcd_test'] = ['test/raft/etcd_test.cc', 'test/raft/helpers.cc', 'test/lib/log.cc'] + scylla_minimal_raft_dependencies
deps['test/raft/raft_sys_table_storage_test'] = ['test/raft/raft_sys_table_storage_test.cc'] + \
diff --git a/test/raft/many_test.cc b/test/raft/many_test.cc
new file mode 100644
index 0000000000..e26b982585
--- /dev/null
+++ b/test/raft/many_test.cc
@@ -0,0 +1,48 @@
+/*
+ * Copyright (C) 2021-present ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+// Test Raft library with many candidates
+
+#include "replication.hh"
+
+// Using slower but precise clock
+size_t tick_delta_n = 100;
+seastar::steady_clock_type::duration tick_delta = tick_delta_n * 1ms; // 100ms
+auto network_delay = 30ms; // 1/3rd of tick
+auto local_delay = 1ms; // same host latency
+auto extra_delay_max_n = 500; // extra randomized per rpc delay (us)
+uint64_t local_mask = ~((1l<<32) - 1); // prefix mask for nodes (shards) per server
+
+
+using update = std::variant<entries, new_leader, partition, disconnect1, disconnect2,
+ stop, reset, wait_log, set_config, check_rpc_config, check_rpc_added,
+ check_rpc_removed, rpc_reset_counters, tick>;
+
+SEASTAR_THREAD_TEST_CASE(test_take_snapshot_and_stream_prevote) {
+ replication_test<steady_clock_type>(
+ { .nodes = 600, .total_values = 10,
+ .updates = {entries{1},
+ disconnect1{0}, // drop leader, free election
+ entries{2},
+ }}
+ , true, false, tick_delta);
+}
+
--
2.31.1

Alejo Sanchez

unread,
Jul 17, 2021, 9:02:57 AMJul 17
to scylladb-dev, Konstantin Osipov, Gleb Natapov, Tomasz Grabiec
Please hang on, in the refactor I forgot to add delays to RPC.

Alejo Sanchez

unread,
Jul 23, 2021, 5:14:58 AMJul 23
to scylla...@googlegroups.com, kos...@scylladb.com, gl...@scylladb.com, tgra...@scylladb.com, Alejo Sanchez
Factor out replication test, make it work with different clocks, add
some features, and add a many nodes test with steady_clock. Also
refactor common test helper.

Many nodes test passes with 600 nodes but with higher numbers it needs
prevote follower backoff and non-uniform candidate timeouts.

Branch URL: https://github.com/alecco/scylla/tree/raft-many-11

Tests: unit ({dev}), unit ({debug}), unit ({release})

Changes in v2:
- ticker delays
- rpc delays
- fix restart_tickers in partitioning
- fix minimum granularity in replication_test
- partition ranges
- remove unused tick_all()
- two tests in many nodes
- more logging
- rebase on latest master

Alejo Sanchez (12):
raft: log election stages
raft: testing: refactor helper
raft: replication test: move common code out
raft: replication test: tick delta inside raft_cluster
raft: replication test: template clock type
raft: replication test: disconnect one or disconnect pair
raft: replication test: partition ranges
raft: replication test: restart tickers
raft: replication test: use minimum granularity
raft: replication test: delays
raft: replication test: remove unused tick_all
raft: testing: many nodes test

configure.py | 8 +-
raft/fsm.cc | 4 +
test/raft/etcd_test.cc | 2 +
test/raft/fsm_test.cc | 2 +
test/raft/{helpers.hh => helpers.cc} | 107 +-
test/raft/helpers.hh | 145 +-
test/raft/many_test.cc | 52 +
.../{replication_test.cc => replication.hh} | 1075 ++++++--------
test/raft/replication_test.cc | 1241 +----------------
9 files changed, 511 insertions(+), 2125 deletions(-)
copy test/raft/{helpers.hh => helpers.cc} (63%)
create mode 100644 test/raft/many_test.cc
copy test/raft/{replication_test.cc => replication.hh} (55%)

--
2.31.1

Alejo Sanchez

unread,
Jul 23, 2021, 5:15:00 AMJul 23
to scylla...@googlegroups.com, kos...@scylladb.com, gl...@scylladb.com, tgra...@scylladb.com, Alejo Sanchez
Add logging for election tracing.

Signed-off-by: Alejo Sanchez <alejo....@scylladb.com>
---
raft/fsm.cc | 4 ++++
1 file changed, 4 insertions(+)

diff --git a/raft/fsm.cc b/raft/fsm.cc
index 2a9d744907..0852b32107 100644
--- a/raft/fsm.cc
+++ b/raft/fsm.cc
@@ -235,8 +235,10 @@ void fsm::become_candidate(bool is_prevote, bool is_leadership_transfer) {
if (votes.tally_votes() == vote_result::WON) {
// A single node cluster.
if (is_prevote) {
+ logger.trace("{} become_candidate: won prevote", _my_id);
become_candidate(false);
} else {
+ logger.trace("{} become_candidate: won vote", _my_id);
become_leader();
}
}
@@ -742,8 +744,10 @@ void fsm::request_vote_reply(server_id from, vote_reply&& reply) {
break;
case vote_result::WON:
if (state.is_prevote) {
+ logger.trace("{} request_vote_reply: won prevote", _my_id);
become_candidate(false);
} else {
+ logger.trace("{} request_vote_reply: won vote", _my_id);
become_leader();
}
break;
--
2.31.1

Alejo Sanchez

unread,
Jul 23, 2021, 5:15:01 AMJul 23
to scylla...@googlegroups.com, kos...@scylladb.com, gl...@scylladb.com, tgra...@scylladb.com, Alejo Sanchez
Move definitions to helper object file.

Signed-off-by: Alejo Sanchez <alejo....@scylladb.com>
---
configure.py | 6 +-
test/raft/etcd_test.cc | 2 +
test/raft/fsm_test.cc | 2 +
test/raft/{helpers.hh => helpers.cc} | 107 ++------------------
test/raft/helpers.hh | 145 +++++----------------------
5 files changed, 39 insertions(+), 223 deletions(-)
copy test/raft/{helpers.hh => helpers.cc} (63%)

diff --git a/configure.py b/configure.py
index 94a2913d71..4b9c369339 100755
--- a/configure.py
+++ b/configure.py
@@ -1252,10 +1252,10 @@ deps['test/boost/linearizing_input_stream_test'] = [
deps['test/boost/duration_test'] += ['test/lib/exception_utils.cc']
deps['test/boost/alternator_unit_test'] += ['alternator/base64.cc']

-deps['test/raft/replication_test'] = ['test/raft/replication_test.cc'] + scylla_raft_dependencies
+deps['test/raft/replication_test'] = ['test/raft/replication_test.cc', 'test/raft/helpers.cc'] + scylla_raft_dependencies
deps['test/raft/randomized_nemesis_test'] = ['test/raft/randomized_nemesis_test.cc'] + scylla_raft_dependencies
-deps['test/raft/fsm_test'] = ['test/raft/fsm_test.cc', 'test/lib/log.cc'] + scylla_raft_dependencies
-deps['test/raft/etcd_test'] = ['test/raft/etcd_test.cc', 'test/lib/log.cc'] + scylla_raft_dependencies
+deps['test/raft/fsm_test'] = ['test/raft/fsm_test.cc', 'test/raft/helpers.cc', 'test/lib/log.cc'] + scylla_raft_dependencies
+deps['test/raft/etcd_test'] = ['test/raft/etcd_test.cc', 'test/raft/helpers.cc', 'test/lib/log.cc'] + scylla_raft_dependencies
deps['test/raft/raft_sys_table_storage_test'] = ['test/raft/raft_sys_table_storage_test.cc'] + \
scylla_core + scylla_tests_generic_dependencies
deps['test/raft/raft_address_map_test'] = ['test/raft/raft_address_map_test.cc'] + scylla_core
diff --git a/test/raft/etcd_test.cc b/test/raft/etcd_test.cc
index 872ba8ccf0..c051eb595f 100644
--- a/test/raft/etcd_test.cc
+++ b/test/raft/etcd_test.cc
@@ -35,6 +35,8 @@

// Port of etcd Raft implementation unit tests

+#define BOOST_TEST_MODULE raft
+
#include "test/raft/helpers.hh"

using namespace raft;
diff --git a/test/raft/fsm_test.cc b/test/raft/fsm_test.cc
index 299eb0a0d4..d9a8f4e1ae 100644
--- a/test/raft/fsm_test.cc
+++ b/test/raft/fsm_test.cc
@@ -19,6 +19,8 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
- }
- }
- }
- }
- } while (has_traffic);
-}
+communicate_impl(std::function<bool()> stop_pred, raft_routing_map& map);

template <typename... Args>
void communicate_until(std::function<bool()> stop_pred, Args&&... args) {
@@ -212,44 +140,19 @@ raft::fsm* select_leader(Args&&... args) {
}


-raft::server_id id() {
- static int id = 0;
- return raft::server_id{utils::UUID(0, ++id)};
-}
-
-raft::server_address_set address_set(std::vector<raft::server_id> ids) {
- raft::server_address_set set;

Alejo Sanchez

unread,
Jul 23, 2021, 5:15:04 AMJul 23
to scylla...@googlegroups.com, kos...@scylladb.com, gl...@scylladb.com, tgra...@scylladb.com, Alejo Sanchez
Common replication test code moved to header.

Signed-off-by: Alejo Sanchez <alejo....@scylladb.com>
---
.../{replication_test.cc => replication.hh} | 525 +------
test/raft/replication_test.cc | 1221 +----------------
2 files changed, 3 insertions(+), 1743 deletions(-)
copy test/raft/{replication_test.cc => replication.hh} (64%)

diff --git a/test/raft/replication_test.cc b/test/raft/replication.hh
similarity index 64%
copy from test/raft/replication_test.cc
copy to test/raft/replication.hh
index 9e86821bea..902c990880 100644
--- a/test/raft/replication_test.cc
+++ b/test/raft/replication.hh
@@ -19,6 +19,8 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/

+#pragma once
+
#include <random>
#include <seastar/core/app-template.hh>
#include <seastar/core/sleep.hh>
@@ -1241,526 +1243,3 @@ future<> run_test(test_case test, bool prevote, bool packet_drops) {
void replication_test(struct test_case test, bool prevote, bool packet_drops) {
run_test(std::move(test), prevote, packet_drops).get();
}
-
-#define RAFT_TEST_CASE(test_name, test_body) \
- SEASTAR_THREAD_TEST_CASE(test_name) { \
- replication_test(test_body, false, false); } \
-// TODO: change to RAFT_TEST_CASE once it's stable for handling packet drops
-SEASTAR_THREAD_TEST_CASE(test_take_snapshot_and_stream) {
- replication_test(
- // Snapshot automatic take and load
- {.nodes = 3,
- .config = {{.snapshot_threshold = 10, .snapshot_trailing = 5}},
- .updates = {entries{5}, partition{0,1}, entries{10}, partition{0, 2}, entries{20}}}
- , false, false);
- .updates = {tick{10} /* ticking starts snapshot transfer */, new_leader{1}, entries{10}}}
- , false, false);
- .snapshot = {.config = address_set({node_id{0},node_id{1},node_id{2},node_id{3}})
- }
- }},
-
- // A should observe RPC configuration = {A, B, C, D} since it's the union
- // of an uncommitted joint config components
- // {.current = {A, B, C, D}, .previous = {A, B, C}}.
- check_rpc_config{node_id{0},
- rpc_address_set{node_id{0},node_id{1},node_id{2},node_id{3}}},
-
- // Elect B as leader
- new_leader{1},
-
- // Heal network partition. Connect all.
- partition{0,1,2,3},
-
- // wait to synchronize logs between current leader (B) and the rest of the cluster
- wait_log{0,2},
-
- // A's RPC configuration is reverted to committed configuration {A, B, C}.
- check_rpc_config{{node_id{0},node_id{1},node_id{2}},
- rpc_address_set{node_id{0},node_id{1},node_id{2}}},
- }}));
-
diff --git a/test/raft/replication_test.cc b/test/raft/replication_test.cc
index 9e86821bea..4bdbb133ce 100644
--- a/test/raft/replication_test.cc
+++ b/test/raft/replication_test.cc
@@ -19,1228 +19,9 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/

-#include <random>
-#include <seastar/core/app-template.hh>
-#include <seastar/core/sleep.hh>
-#include <seastar/core/coroutine.hh>
-#include <seastar/core/loop.hh>
-#include <seastar/util/log.hh>
-#include <seastar/util/later.hh>
-#include <seastar/util/variant_utils.hh>
-#include <seastar/testing/random.hh>
-#include <seastar/testing/thread_test_case.hh>
-#include <seastar/testing/test_case.hh>
-#include "raft/server.hh"
-#include "serializer.hh"
-#include "serializer_impl.hh"
-#include "xx_hasher.hh"
-#include "test/raft/helpers.hh"
-#include "test/lib/eventually.hh"
+#include "replication.hh"

// Test Raft library with declarative test definitions
-static seastar::logger tlogger("test");
-
-lowres_clock::duration tick_delta = 1ms;
-
-}
-
-raft::server_address_set address_set(std::vector<node_id> nodes) {
- return address_set(to_raft_id_vec(nodes));
-}
-
-// Updates can be
-// - Entries
-// - Leader change
-// - Configuration change
-struct entries {
- size_t n;
-};
-struct new_leader {
- size_t id;
-};
-struct leader {
- size_t id;
-};
-using partition = std::vector<std::variant<leader,int>>;
-using update = std::variant<entries, new_leader, partition, disconnect, stop, reset, wait_log,
- set_config, check_rpc_config, check_rpc_added, check_rpc_removed, rpc_reset_counters,
- tick>;
- std::unordered_set<size_t> _in_configuration; // Servers in current configuration
- std::vector<seastar::timer<lowres_clock>> _tickers;
- future<> reset(::reset server);
- void disconnect(::disconnect nodes);
- void verify();
-private:
- test_server create_server(size_t id, initial_state state);
-};
-
-class raft_cluster::state_machine : public raft::state_machine {
-class raft_cluster::persistence : public raft::persistence {
-class raft_cluster::failure_detector : public raft::failure_detector {
- raft::server_id _id;
- connected* _connected;
-public:
- failure_detector(raft::server_id id, connected* connected) : _id(id), _connected(connected) {}
- bool is_alive(raft::server_id server) override {
- return (*_connected)(server, _id);
- }
-};
-
-class raft_cluster::rpc : public raft::rpc {
-std::unordered_map<raft::server_id, raft_cluster::rpc*> raft_cluster::rpc::net;
-
-raft_cluster::test_server raft_cluster::create_server(size_t id, initial_state state) {
-// Reset previously stopped server
-future<> raft_cluster::reset_server(size_t id, initial_state state) {
-void raft_cluster::disconnect(size_t id, std::optional<raft::server_id> except) {
- _connected->disconnect(to_raft_id(id), except);
-}
-
-void raft_cluster::connect_all() {
- _connected->connect_all();
-}
-
-// Add consecutive integer entries to a leader
-future<> raft_cluster::add_entries(size_t n) {
- size_t end = _next_val + n;
- while (_next_val != end) {
- try {
- co_await _servers[_leader].server->add_entry(create_command(_next_val), raft::wait_type::committed);
- _next_val++;
- } catch (raft::not_a_leader& e) {
- // leader stepped down, update with new leader if present
- if (e.leader != raft::server_id{}) {
- _leader = to_local_id(e.leader.id);
- }
- } catch (raft::commit_status_unknown& e) {
- } catch (raft::dropped_entry& e) {
- // retry if an entry is dropped because the leader have changed after it was submitetd
- }
- }
-}
-
-future<> raft_cluster::add_remaining_entries() {
- co_await add_entries(_apply_entries - _next_val);
-}
-
-void raft_cluster::init_raft_tickers() {
- _tickers.resize(_servers.size());
- // Only start tickers for servers in configuration
- for (auto s: _in_configuration) {
- _tickers[s].arm_periodic(tick_delta);
- _tickers[s].set_callback([&, s] {
- _servers[s].server->tick();
- });
- }
-}
-
-void raft_cluster::pause_tickers() {
- for (auto s: _in_configuration) {
- _tickers[s].cancel();
- }
-}
-
-void raft_cluster::restart_tickers() {
- for (auto s: _in_configuration) {
- _tickers[s].rearm_periodic(tick_delta);
- }
-}
-
-void raft_cluster::cancel_ticker(size_t id) {
- _tickers[id].cancel();
-}
-
-void raft_cluster::set_ticker_callback(size_t id) noexcept {
-// Wait for leader log to propagate to follower
-future<> raft_cluster::wait_log(size_t follower) {
- if ((*_connected)(to_raft_id(_leader), to_raft_id(follower)) &&
- _in_configuration.contains(_leader) && _in_configuration.contains(follower)) {
- auto leader_log_idx_term = _servers[_leader].server->log_last_idx_term();
- co_await _servers[follower].server->wait_log_idx_term(leader_log_idx_term);
- }
-}
-
-// Wait for leader log to propagate to specified followers
-future<> raft_cluster::wait_log(::wait_log followers) {
- auto leader_log_idx_term = _servers[_leader].server->log_last_idx_term();
- for (auto s: followers.local_ids) {
- co_await _servers[s].server->wait_log_idx_term(leader_log_idx_term);
- }
-}
-
-// Wait for all connected followers to catch up
-future<> raft_cluster::wait_log_all() {
-// Run a free election of nodes in configuration
-// NOTE: there should be enough nodes capable of participating
-future<> raft_cluster::free_election() {
- tlogger.debug("Running free election");
- elapse_elections();
- size_t node = 0;
- for (;;) {
- tick_all();
- co_await seastar::sleep(10us); // Wait for election rpc exchanges
- // find if we have a leader
- for (auto s: _in_configuration) {
- if (_servers[s].server->is_leader()) {
- tlogger.debug("New leader {}", s);
- _leader = s;
- co_return;
- }
- }
- }
-}
-
-future<> raft_cluster::change_configuration(set_config sc) {
- BOOST_CHECK_MESSAGE(sc.size() > 0, "Empty configuration change not supported");
- raft::server_address_set set;
- std::unordered_set<size_t> new_config;
- for (auto s: sc) {
- new_config.insert(s.node_idx);
- auto addr = to_server_address(s.node_idx);
- addr.can_vote = s.can_vote;
- set.insert(std::move(addr));
- BOOST_CHECK_MESSAGE(s.node_idx < _servers.size(),
- format("Configuration element {} past node limit {}", s.node_idx, _servers.size() - 1));
- }
- BOOST_CHECK_MESSAGE(new_config.contains(_leader) || sc.size() < (_servers.size()/2 + 1),
- "New configuration without old leader and below quorum size (no election)");
-
- if (!new_config.contains(_leader)) {
- // Wait log on all nodes in new config before change
- for (auto s: sc) {
- co_await wait_log(s.node_idx);
- }
- }
-
- // Start nodes in new configuration but not in current configuration (re-added)
- for (auto s: new_config) {
- if (!_in_configuration.contains(s)) {
- tlogger.debug("Starting node being re-added to configuration {}", s);
- co_await reset_server(s, initial_state{.log = {}});
- _tickers[s].rearm_periodic(tick_delta);
-future<> raft_cluster::check_rpc_config(::check_rpc_config cc) {
- auto as = address_set(cc.addrs);
- for (auto& node: cc.nodes) {
- BOOST_CHECK(node.id < _servers.size());
- co_await seastar::async([&] {
- CHECK_EVENTUALLY_EQUAL(_servers[node.id].rpc->known_peers(), as);
- });
- }
-}
-
-void raft_cluster::check_rpc_added(::check_rpc_added expected) const {
- for (auto node: expected.nodes) {
- BOOST_CHECK_MESSAGE(_servers[node.id].rpc->servers_added() == expected.expected,
- format("RPC added {} does not match expected {}",
- _servers[node.id].rpc->servers_added(), expected.expected));
- }
-}
-
-void raft_cluster::check_rpc_removed(::check_rpc_removed expected) const {
- for (auto node: expected.nodes) {
- BOOST_CHECK_MESSAGE(_servers[node.id].rpc->servers_removed() == expected.expected,
- format("RPC removed {} does not match expected {}",
- _servers[node.id].rpc->servers_removed(), expected.expected));
- }
-}
-
-void raft_cluster::rpc_reset_counters(::rpc_reset_counters nodes) {
- for (auto node: nodes) {
- _servers[node.id].rpc->reset_counters();
- }
-}
-
-future<> raft_cluster::reconfigure_all() {
- if (_in_configuration.size() < _servers.size()) {
- set_config sc;
- for (size_t s = 0; s < _servers.size(); ++s) {
- sc.push_back(s);
- }
- co_await change_configuration(std::move(sc));
- }
-}
-
-future<> raft_cluster::partition(::partition p) {
- std::unordered_set<size_t> partition_servers;
- std::optional<size_t> next_leader;
- for (auto s: p) {
- size_t id;
- if (std::holds_alternative<struct leader>(s)) {
- next_leader = std::get<struct leader>(s).id;
- id = *next_leader;
- } else {
- id = std::get<int>(s);
-future<> raft_cluster::tick(::tick t) {
- for (uint64_t i = 0; i < t.ticks; i++) {
- for (auto&& s: _servers) {
- s.server->tick();
- }
- co_await later();
- }
-}
-
-future<> raft_cluster::stop(::stop server) {
- co_await stop_server(server.id);
-}
-
-future<> raft_cluster::reset(::reset server) {
- co_await reset_server(server.id, server.state);
-}
-
-void raft_cluster::disconnect(::disconnect nodes) {
- _connected->cut(to_raft_id(nodes.first), to_raft_id(nodes.second));
-}
-
-void raft_cluster::verify() {
- BOOST_TEST_MESSAGE("Verifying hashes match expected (snapshot and apply calls)");
- auto expected = hasher_int::hash_range(_apply_entries).finalize_uint64();
- for (auto i: _in_configuration) {
- auto digest = _servers[i].sm->hasher->finalize_uint64();
- BOOST_CHECK_MESSAGE(digest == expected,
- format("Digest doesn't match for server [{}]: {} != {}", i, digest, expected));
- }
-
- BOOST_TEST_MESSAGE("Verifying persisted snapshots");
- // TODO: check that snapshot is taken when it should be
- for (auto& s : (*_persisted_snapshots)) {
- auto& [snp, val] = s.second;
- auto digest = val.hasher.finalize_uint64();
- auto expected = hasher_int::hash_range(val.idx).finalize_uint64();
- BOOST_CHECK_MESSAGE(digest == expected,
- format("Persisted snapshot {} doesn't match {} != {}", snp.id, digest, expected));
- }
-}
-
-std::vector<initial_state> raft_cluster::get_states(test_case test, bool prevote) {
-future<> run_test(test_case test, bool prevote, bool packet_drops) {
-
- raft_cluster rafts(test, apply_changes, test.total_values,
- test.get_first_val(), test.initial_leader, prevote, packet_drops);
- co_await rafts.start_all();
-
- BOOST_TEST_MESSAGE("Processing updates");
-
- // Process all updates in order
- for (auto update: test.updates) {
- co_await std::visit(make_visitor(
- [&rafts] (entries update) -> future<> {
- co_await rafts.add_entries(update.n);
- },
- [&rafts] (new_leader update) -> future<> {
- co_await rafts.elect_new_leader(update.id);
- },
- [&rafts] (disconnect update) -> future<> {
- }
- ), std::move(update));
- }
-
- // Reconnect and bring all nodes back into configuration, if needed
- rafts.connect_all();
- co_await rafts.reconfigure_all();
-
- if (test.total_values > 0) {
- BOOST_TEST_MESSAGE("Appending remaining values");
- co_await rafts.add_remaining_entries();
- co_await rafts.wait_all();
- }
-
- co_await rafts.stop_all();
-
- if (test.total_values > 0) {
- rafts.verify();
- }
-}
-
-void replication_test(struct test_case test, bool prevote, bool packet_drops) {
- run_test(std::move(test), prevote, packet_drops).get();

Alejo Sanchez

unread,
Jul 23, 2021, 5:15:05 AMJul 23
to scylla...@googlegroups.com, kos...@scylladb.com, gl...@scylladb.com, tgra...@scylladb.com, Alejo Sanchez
Store tick delta inside raft_cluster.

Signed-off-by: Alejo Sanchez <alejo....@scylladb.com>
---
test/raft/replication.hh | 27 +++++++++++++++------------
test/raft/replication_test.cc | 17 ++++++++++-------
2 files changed, 25 insertions(+), 19 deletions(-)

diff --git a/test/raft/replication.hh b/test/raft/replication.hh
index 902c990880..3355d68181 100644
--- a/test/raft/replication.hh
+++ b/test/raft/replication.hh
@@ -77,8 +77,6 @@ using namespace std::placeholders;

static seastar::logger tlogger("test");

-lowres_clock::duration tick_delta = 1ms;
-
auto dummy_command = std::numeric_limits<int>::min();

class hasher_int : public xx_hasher {
@@ -303,11 +301,12 @@ class raft_cluster {
std::vector<seastar::timer<lowres_clock>> _tickers;
size_t _leader;
std::vector<initial_state> get_states(test_case test, bool prevote);
+ lowres_clock::duration _tick_delta;
public:
raft_cluster(test_case test,
apply_fn apply,
size_t apply_entries, size_t first_val, size_t first_leader,
- bool prevote, bool packet_drops);
+ bool prevote, bool packet_drops, lowres_clock::duration tick_delta);
// No copy
raft_cluster(const raft_cluster&) = delete;
raft_cluster(raft_cluster&&) = default;
@@ -640,7 +639,7 @@ raft_cluster::test_server raft_cluster::create_server(size_t id, initial_state s
raft_cluster::raft_cluster(test_case test,
apply_fn apply,
size_t apply_entries, size_t first_val, size_t first_leader,
- bool prevote, bool packet_drops) :
+ bool prevote, bool packet_drops, lowres_clock::duration tick_delta) :
_connected(std::make_unique<struct connected>(test.nodes)),
_snapshots(std::make_unique<snapshots>()),
_persisted_snapshots(std::make_unique<persisted_snapshots>()),
@@ -649,7 +648,8 @@ raft_cluster::raft_cluster(test_case test,
_packet_drops(packet_drops),
_prevote(prevote),
_apply(apply),
- _leader(first_leader) {
+ _leader(first_leader),
+ _tick_delta(tick_delta) {

rpc::reset_network();

@@ -750,7 +750,7 @@ void raft_cluster::init_raft_tickers() {
_tickers.resize(_servers.size());
// Only start tickers for servers in configuration
for (auto s: _in_configuration) {
- _tickers[s].arm_periodic(tick_delta);
+ _tickers[s].arm_periodic(_tick_delta);
_tickers[s].set_callback([&, s] {
_servers[s].server->tick();
});
@@ -765,7 +765,7 @@ void raft_cluster::pause_tickers() {

void raft_cluster::restart_tickers() {
for (auto s: _in_configuration) {
- _tickers[s].rearm_periodic(tick_delta);
+ _tickers[s].rearm_periodic(_tick_delta);
}
}

@@ -978,7 +978,7 @@ future<> raft_cluster::change_configuration(set_config sc) {
if (!_in_configuration.contains(s)) {
tlogger.debug("Starting node being re-added to configuration {}", s);
co_await reset_server(s, initial_state{.log = {}});
- _tickers[s].rearm_periodic(tick_delta);
+ _tickers[s].rearm_periodic(_tick_delta);
}
}

@@ -1166,10 +1166,12 @@ std::vector<initial_state> raft_cluster::get_states(test_case test, bool prevote
return states;
}

-future<> run_test(test_case test, bool prevote, bool packet_drops) {
+future<> run_test(test_case test, bool prevote, bool packet_drops,
+ lowres_clock::duration tick_delta) {

raft_cluster rafts(test, apply_changes, test.total_values,
- test.get_first_val(), test.initial_leader, prevote, packet_drops);
+ test.get_first_val(), test.initial_leader, prevote, packet_drops,
+ tick_delta);
co_await rafts.start_all();

BOOST_TEST_MESSAGE("Processing updates");
@@ -1240,6 +1242,7 @@ future<> run_test(test_case test, bool prevote, bool packet_drops) {
}
}

-void replication_test(struct test_case test, bool prevote, bool packet_drops) {
- run_test(std::move(test), prevote, packet_drops).get();
+void replication_test(struct test_case test, bool prevote, bool packet_drops,
+ lowres_clock::duration tick_delta) {
+ run_test(std::move(test), prevote, packet_drops, tick_delta).get();
}
diff --git a/test/raft/replication_test.cc b/test/raft/replication_test.cc
index 4bdbb133ce..ad6d7b1445 100644
--- a/test/raft/replication_test.cc
+++ b/test/raft/replication_test.cc
@@ -23,15 +23,18 @@

// Test Raft library with declarative test definitions

+
+lowres_clock::duration tick_delta = 1ms;
+
#define RAFT_TEST_CASE(test_name, test_body) \
SEASTAR_THREAD_TEST_CASE(test_name) { \
- replication_test(test_body, false, false); } \
+ replication_test(test_body, false, false, tick_delta); } \
SEASTAR_THREAD_TEST_CASE(test_name ## _drops) { \
- replication_test(test_body, false, true); } \
+ replication_test(test_body, false, true, tick_delta); } \
SEASTAR_THREAD_TEST_CASE(test_name ## _prevote) { \
- replication_test(test_body, true, false); } \
+ replication_test(test_body, true, false, tick_delta); } \
SEASTAR_THREAD_TEST_CASE(test_name ## _prevote_drops) { \
- replication_test(test_body, true, true); }
+ replication_test(test_body, true, true, tick_delta); }

// 1 nodes, simple replication, empty, no updates
RAFT_TEST_CASE(simple_replication, (test_case{
@@ -203,7 +206,7 @@ SEASTAR_THREAD_TEST_CASE(test_take_snapshot_and_stream) {
{.nodes = 3,
.config = {{.snapshot_threshold = 10, .snapshot_trailing = 5}},
.updates = {entries{5}, partition{0,1}, entries{10}, partition{0, 2}, entries{20}}}
- , false, false);
+ , false, false, tick_delta);
}

// Check removing all followers, add entry, bring back one follower and make it leader
@@ -228,7 +231,7 @@ SEASTAR_THREAD_TEST_CASE(remove_node_cycle) {
// TODO: find out why it breaks in release mode
// set_config{3,0,1}, entries{2}, new_leader{0}
}}
- , false, false);
+ , false, false, tick_delta);
}

SEASTAR_THREAD_TEST_CASE(test_leader_change_during_snapshot_transfere) {
@@ -241,7 +244,7 @@ SEASTAR_THREAD_TEST_CASE(test_leader_change_during_snapshot_transfere) {
.term = raft::term_t(1),
.id = delay_apply_snapshot}}},
.updates = {tick{10} /* ticking starts snapshot transfer */, new_leader{1}, entries{10}}}
- , false, false);
+ , false, false, tick_delta);
}

// verifies that each node in a cluster can campaign
--
2.31.1

Alejo Sanchez

unread,
Jul 23, 2021, 5:15:08 AMJul 23
to scylla...@googlegroups.com, kos...@scylladb.com, gl...@scylladb.com, tgra...@scylladb.com, Alejo Sanchez
Templetize clock type.

Use a struct for run_test to work around
https://bugs.llvm.org/show_bug.cgi?id=50345

With help from @kbr-

Signed-off-by: Alejo Sanchez <alejo....@scylladb.com>
---
test/raft/replication.hh | 283 ++++++++++++++++++++--------------
test/raft/replication_test.cc | 15 +-
2 files changed, 173 insertions(+), 125 deletions(-)

diff --git a/test/raft/replication.hh b/test/raft/replication.hh
index 3355d68181..35f647f4e6 100644
--- a/test/raft/replication.hh
+++ b/test/raft/replication.hh
@@ -276,6 +276,7 @@ raft::snapshot_id delay_apply_snapshot{utils::UUID(0, 0xdeadbeaf)};
// sending of a snaphot with that id will be delayed until snapshot_sync is signaled
raft::snapshot_id delay_send_snapshot{utils::UUID(0xdeadbeaf, 0)};

+template <typename Clock>
class raft_cluster {
using apply_fn = std::function<size_t(raft::server_id id, const std::vector<raft::command_cref>& commands, lw_shared_ptr<hasher_int> hasher)>;
class state_machine;
@@ -298,15 +299,16 @@ class raft_cluster {
bool _prevote;
apply_fn _apply;
std::unordered_set<size_t> _in_configuration; // Servers in current configuration
- std::vector<seastar::timer<lowres_clock>> _tickers;
+ std::vector<seastar::timer<Clock>> _tickers;
size_t _leader;
std::vector<initial_state> get_states(test_case test, bool prevote);
- lowres_clock::duration _tick_delta;
+ typename Clock::duration _tick_delta;
public:
raft_cluster(test_case test,
apply_fn apply,
size_t apply_entries, size_t first_val, size_t first_leader,
- bool prevote, bool packet_drops, lowres_clock::duration tick_delta);
+ bool prevote, bool packet_drops,
+ typename Clock::duration tick_delta);
// No copy
raft_cluster(const raft_cluster&) = delete;
raft_cluster(raft_cluster&&) = default;
@@ -350,7 +352,8 @@ class raft_cluster {
test_server create_server(size_t id, initial_state state);
};

-class raft_cluster::state_machine : public raft::state_machine {
+template <typename Clock>
+class raft_cluster<Clock>::state_machine : public raft::state_machine {
raft::server_id _id;
apply_fn _apply;
size_t _apply_entries;
@@ -402,7 +405,8 @@ class raft_cluster::state_machine : public raft::state_machine {
}
};

-class raft_cluster::persistence : public raft::persistence {
+template <typename Clock>
+class raft_cluster<Clock>::persistence : public raft::persistence {
raft::server_id _id;
initial_state _conf;
snapshots* _snapshots;
@@ -438,7 +442,8 @@ class raft_cluster::persistence : public raft::persistence {
virtual future<> abort() { return make_ready_future<>(); }
};

-struct raft_cluster::connected {
+template <typename Clock>
+struct raft_cluster<Clock>::connected {
struct connection {
raft::server_id from;
raft::server_id to;
@@ -492,7 +497,8 @@ struct raft_cluster::connected {
}
};

-class raft_cluster::failure_detector : public raft::failure_detector {
+template <typename Clock>
+class raft_cluster<Clock>::failure_detector : public raft::failure_detector {
raft::server_id _id;
connected* _connected;
public:
@@ -502,7 +508,8 @@ class raft_cluster::failure_detector : public raft::failure_detector {
}
};

-class raft_cluster::rpc : public raft::rpc {
+template <typename Clock>
+class raft_cluster<Clock>::rpc : public raft::rpc {
static std::unordered_map<raft::server_id, rpc*> net;
raft::server_id _id;
connected* _connected;
@@ -610,9 +617,11 @@ class raft_cluster::rpc : public raft::rpc {
}
};

-std::unordered_map<raft::server_id, raft_cluster::rpc*> raft_cluster::rpc::net;
+template <typename Clock>
+std::unordered_map<raft::server_id, typename raft_cluster<Clock>::rpc*> raft_cluster<Clock>::rpc::net;

-raft_cluster::test_server raft_cluster::create_server(size_t id, initial_state state) {
+template <typename Clock>
+typename raft_cluster<Clock>::test_server raft_cluster<Clock>::create_server(size_t id, initial_state state) {

auto uuid = to_raft_id(id);
auto sm = std::make_unique<state_machine>(uuid, _apply, _apply_entries, _snapshots.get());
@@ -636,10 +645,12 @@ raft_cluster::test_server raft_cluster::create_server(size_t id, initial_state s
};
}

-raft_cluster::raft_cluster(test_case test,
+template <typename Clock>
+raft_cluster<Clock>::raft_cluster(test_case test,
apply_fn apply,
size_t apply_entries, size_t first_val, size_t first_leader,
- bool prevote, bool packet_drops, lowres_clock::duration tick_delta) :
+ bool prevote, bool packet_drops,
+ typename Clock::duration tick_delta) :
_connected(std::make_unique<struct connected>(test.nodes)),
_snapshots(std::make_unique<snapshots>()),
_persisted_snapshots(std::make_unique<persisted_snapshots>()),
@@ -673,7 +684,8 @@ raft_cluster::raft_cluster(test_case test,
}
}

-future<> raft_cluster::stop_server(size_t id) {
+template <typename Clock>
+future<> raft_cluster<Clock>::stop_server(size_t id) {
cancel_ticker(id);
co_await _servers[id].server->abort();
_snapshots->erase(to_raft_id(id));
@@ -681,13 +693,15 @@ future<> raft_cluster::stop_server(size_t id) {
}

// Reset previously stopped server
-future<> raft_cluster::reset_server(size_t id, initial_state state) {
+template <typename Clock>
+future<> raft_cluster<Clock>::reset_server(size_t id, initial_state state) {
_servers[id] = create_server(id, state);
co_await _servers[id].server->start();
set_ticker_callback(id);
}

-future<> raft_cluster::start_all() {
+template <typename Clock>
+future<> raft_cluster<Clock>::start_all() {
co_await parallel_for_each(_servers, [] (auto& r) {
return r.server->start();
});
@@ -697,34 +711,40 @@ future<> raft_cluster::start_all() {
co_await _servers[_leader].server->wait_election_done();
}

-future<> raft_cluster::stop_all() {
+template <typename Clock>
+future<> raft_cluster<Clock>::stop_all() {
for (auto s: _in_configuration) {
co_await stop_server(s);
};
}

-future<> raft_cluster::wait_all() {
+template <typename Clock>
+future<> raft_cluster<Clock>::wait_all() {
for (auto s: _in_configuration) {
co_await _servers[s].sm->done();
}
}

-void raft_cluster::tick_all() {
+template <typename Clock>
+void raft_cluster<Clock>::tick_all() {
for (auto s: _in_configuration) {
_servers[s].server->tick();
}
}

-void raft_cluster::disconnect(size_t id, std::optional<raft::server_id> except) {
+template <typename Clock>
+void raft_cluster<Clock>::disconnect(size_t id, std::optional<raft::server_id> except) {
_connected->disconnect(to_raft_id(id), except);
}

-void raft_cluster::connect_all() {
+template <typename Clock>
+void raft_cluster<Clock>::connect_all() {
_connected->connect_all();
}

// Add consecutive integer entries to a leader
-future<> raft_cluster::add_entries(size_t n) {
+template <typename Clock>
+future<> raft_cluster<Clock>::add_entries(size_t n) {
size_t end = _next_val + n;
while (_next_val != end) {
try {
@@ -742,11 +762,13 @@ future<> raft_cluster::add_entries(size_t n) {
}
}

-future<> raft_cluster::add_remaining_entries() {
+template <typename Clock>
+future<> raft_cluster<Clock>::add_remaining_entries() {
co_await add_entries(_apply_entries - _next_val);
}

-void raft_cluster::init_raft_tickers() {
+template <typename Clock>
+void raft_cluster<Clock>::init_raft_tickers() {
_tickers.resize(_servers.size());
// Only start tickers for servers in configuration
for (auto s: _in_configuration) {
@@ -757,23 +779,27 @@ void raft_cluster::init_raft_tickers() {
}
}

-void raft_cluster::pause_tickers() {
+template <typename Clock>
+void raft_cluster<Clock>::pause_tickers() {
for (auto s: _in_configuration) {
_tickers[s].cancel();
}
}

-void raft_cluster::restart_tickers() {
+template <typename Clock>
+void raft_cluster<Clock>::restart_tickers() {
for (auto s: _in_configuration) {
_tickers[s].rearm_periodic(_tick_delta);
}
}

-void raft_cluster::cancel_ticker(size_t id) {
+template <typename Clock>
+void raft_cluster<Clock>::cancel_ticker(size_t id) {
_tickers[id].cancel();
}

-void raft_cluster::set_ticker_callback(size_t id) noexcept {
+template <typename Clock>
+void raft_cluster<Clock>::set_ticker_callback(size_t id) noexcept {
_tickers[id].set_callback([&, id] {
_servers[id].server->tick();
});
@@ -814,7 +840,8 @@ size_t apply_changes(raft::server_id id, const std::vector<raft::command_cref>&
};

// Wait for leader log to propagate to follower
-future<> raft_cluster::wait_log(size_t follower) {
+template <typename Clock>
+future<> raft_cluster<Clock>::wait_log(size_t follower) {
if ((*_connected)(to_raft_id(_leader), to_raft_id(follower)) &&
_in_configuration.contains(_leader) && _in_configuration.contains(follower)) {
auto leader_log_idx_term = _servers[_leader].server->log_last_idx_term();
@@ -823,7 +850,8 @@ future<> raft_cluster::wait_log(size_t follower) {
}

// Wait for leader log to propagate to specified followers
-future<> raft_cluster::wait_log(::wait_log followers) {
+template <typename Clock>
+future<> raft_cluster<Clock>::wait_log(::wait_log followers) {
auto leader_log_idx_term = _servers[_leader].server->log_last_idx_term();
for (auto s: followers.local_ids) {
co_await _servers[s].server->wait_log_idx_term(leader_log_idx_term);
@@ -831,7 +859,8 @@ future<> raft_cluster::wait_log(::wait_log followers) {
}

// Wait for all connected followers to catch up
-future<> raft_cluster::wait_log_all() {
+template <typename Clock>
+future<> raft_cluster<Clock>::wait_log_all() {
auto leader_log_idx_term = _servers[_leader].server->log_last_idx_term();
for (size_t s = 0; s < _servers.size(); ++s) {
if (s != _leader && (*_connected)(to_raft_id(s), to_raft_id(_leader)) &&
@@ -841,13 +870,15 @@ future<> raft_cluster::wait_log_all() {
}
}

-void raft_cluster::elapse_elections() {
+template <typename Clock>
+void raft_cluster<Clock>::elapse_elections() {
for (auto s: _in_configuration) {
_servers[s].server->elapse_election();
}
}

-future<> raft_cluster::elect_new_leader(size_t new_leader) {
+template <typename Clock>
+future<> raft_cluster<Clock>::elect_new_leader(size_t new_leader) {
BOOST_CHECK_MESSAGE(new_leader < _servers.size(),
format("Wrong next leader value {}", new_leader));

@@ -933,7 +964,8 @@ future<> raft_cluster::elect_new_leader(size_t new_leader) {

// Run a free election of nodes in configuration
// NOTE: there should be enough nodes capable of participating
-future<> raft_cluster::free_election() {
+template <typename Clock>
+future<> raft_cluster<Clock>::free_election() {
tlogger.debug("Running free election");
elapse_elections();
size_t node = 0;
@@ -951,7 +983,8 @@ future<> raft_cluster::free_election() {
}
}

-future<> raft_cluster::change_configuration(set_config sc) {
+template <typename Clock>
+future<> raft_cluster<Clock>::change_configuration(set_config sc) {
BOOST_CHECK_MESSAGE(sc.size() > 0, "Empty configuration change not supported");
raft::server_address_set set;
std::unordered_set<size_t> new_config;
@@ -1009,7 +1042,8 @@ future<> raft_cluster::change_configuration(set_config sc) {
_in_configuration = new_config;
}

-future<> raft_cluster::check_rpc_config(::check_rpc_config cc) {
+template <typename Clock>
+future<> raft_cluster<Clock>::check_rpc_config(::check_rpc_config cc) {
auto as = address_set(cc.addrs);
for (auto& node: cc.nodes) {
BOOST_CHECK(node.id < _servers.size());
@@ -1019,7 +1053,8 @@ future<> raft_cluster::check_rpc_config(::check_rpc_config cc) {
}
}

-void raft_cluster::check_rpc_added(::check_rpc_added expected) const {
+template <typename Clock>
+void raft_cluster<Clock>::check_rpc_added(::check_rpc_added expected) const {
for (auto node: expected.nodes) {
BOOST_CHECK_MESSAGE(_servers[node.id].rpc->servers_added() == expected.expected,
format("RPC added {} does not match expected {}",
@@ -1027,7 +1062,8 @@ void raft_cluster::check_rpc_added(::check_rpc_added expected) const {
}
}

-void raft_cluster::check_rpc_removed(::check_rpc_removed expected) const {
+template <typename Clock>
+void raft_cluster<Clock>::check_rpc_removed(::check_rpc_removed expected) const {
for (auto node: expected.nodes) {
BOOST_CHECK_MESSAGE(_servers[node.id].rpc->servers_removed() == expected.expected,
format("RPC removed {} does not match expected {}",
@@ -1035,13 +1071,15 @@ void raft_cluster::check_rpc_removed(::check_rpc_removed expected) const {
}
}

-void raft_cluster::rpc_reset_counters(::rpc_reset_counters nodes) {
+template <typename Clock>
+void raft_cluster<Clock>::rpc_reset_counters(::rpc_reset_counters nodes) {
for (auto node: nodes) {
_servers[node.id].rpc->reset_counters();
}
}

-future<> raft_cluster::reconfigure_all() {
+template <typename Clock>
+future<> raft_cluster<Clock>::reconfigure_all() {
if (_in_configuration.size() < _servers.size()) {
set_config sc;
for (size_t s = 0; s < _servers.size(); ++s) {
@@ -1051,7 +1089,8 @@ future<> raft_cluster::reconfigure_all() {
}
}

-future<> raft_cluster::partition(::partition p) {
+template <typename Clock>
+future<> raft_cluster<Clock>::partition(::partition p) {
std::unordered_set<size_t> partition_servers;
std::optional<size_t> next_leader;
for (auto s: p) {
@@ -1094,7 +1133,8 @@ future<> raft_cluster::partition(::partition p) {
restart_tickers();
}

-future<> raft_cluster::tick(::tick t) {
+template <typename Clock>
+future<> raft_cluster<Clock>::tick(::tick t) {
for (uint64_t i = 0; i < t.ticks; i++) {
for (auto&& s: _servers) {
s.server->tick();
@@ -1103,19 +1143,23 @@ future<> raft_cluster::tick(::tick t) {
}
}

-future<> raft_cluster::stop(::stop server) {
+template <typename Clock>
+future<> raft_cluster<Clock>::stop(::stop server) {
co_await stop_server(server.id);
}

-future<> raft_cluster::reset(::reset server) {
+template <typename Clock>
+future<> raft_cluster<Clock>::reset(::reset server) {
co_await reset_server(server.id, server.state);
}

-void raft_cluster::disconnect(::disconnect nodes) {
+template <typename Clock>
+void raft_cluster<Clock>::disconnect(::disconnect nodes) {
_connected->cut(to_raft_id(nodes.first), to_raft_id(nodes.second));
}

-void raft_cluster::verify() {
+template <typename Clock>
+void raft_cluster<Clock>::verify() {
BOOST_TEST_MESSAGE("Verifying hashes match expected (snapshot and apply calls)");
auto expected = hasher_int::hash_range(_apply_entries).finalize_uint64();
for (auto i: _in_configuration) {
@@ -1135,7 +1179,8 @@ void raft_cluster::verify() {
}
}

-std::vector<initial_state> raft_cluster::get_states(test_case test, bool prevote) {
+template <typename Clock>
+std::vector<initial_state> raft_cluster<Clock>::get_states(test_case test, bool prevote) {
std::vector<initial_state> states(test.nodes); // Server initial states

size_t leader = test.initial_leader;
@@ -1166,83 +1211,87 @@ std::vector<initial_state> raft_cluster::get_states(test_case test, bool prevote
return states;
}

-future<> run_test(test_case test, bool prevote, bool packet_drops,
- lowres_clock::duration tick_delta) {
-
- raft_cluster rafts(test, apply_changes, test.total_values,
- test.get_first_val(), test.initial_leader, prevote, packet_drops,
- tick_delta);
+template <typename Clock>
+struct run_test {
+ future<> operator() (test_case test, bool prevote, bool packet_drops,
+ typename Clock::duration tick_delta) {
+
+ raft_cluster<Clock> rafts(test, ::apply_changes, test.total_values,
+ test.get_first_val(), test.initial_leader, prevote, packet_drops,
+ tick_delta);
- ), std::move(update));
- }

- // Reconnect and bring all nodes back into configuration, if needed
- rafts.connect_all();
- co_await rafts.reconfigure_all();
+ // Reconnect and bring all nodes back into configuration, if needed
+ rafts.connect_all();
+ co_await rafts.reconfigure_all();

- if (test.total_values > 0) {
- BOOST_TEST_MESSAGE("Appending remaining values");
- co_await rafts.add_remaining_entries();
- co_await rafts.wait_all();
- }
+ if (test.total_values > 0) {
+ BOOST_TEST_MESSAGE("Appending remaining values");
+ co_await rafts.add_remaining_entries();
+ co_await rafts.wait_all();
+ }

- co_await rafts.stop_all();
+ co_await rafts.stop_all();

- if (test.total_values > 0) {
- rafts.verify();
+ if (test.total_values > 0) {
+ rafts.verify();
+ }
}
-}
+};

+template <typename Clock>
void replication_test(struct test_case test, bool prevote, bool packet_drops,
- lowres_clock::duration tick_delta) {
- run_test(std::move(test), prevote, packet_drops, tick_delta).get();
+ typename Clock::duration tick_delta) {
+ run_test<Clock>{}(std::move(test), prevote, packet_drops, tick_delta).get();
}
diff --git a/test/raft/replication_test.cc b/test/raft/replication_test.cc
index ad6d7b1445..b407ff9ad0 100644
--- a/test/raft/replication_test.cc
+++ b/test/raft/replication_test.cc
@@ -28,13 +28,13 @@ lowres_clock::duration tick_delta = 1ms;

#define RAFT_TEST_CASE(test_name, test_body) \
SEASTAR_THREAD_TEST_CASE(test_name) { \
- replication_test(test_body, false, false, tick_delta); } \
+ replication_test<lowres_clock>(test_body, false, false, tick_delta); } \
SEASTAR_THREAD_TEST_CASE(test_name ## _drops) { \
- replication_test(test_body, false, true, tick_delta); } \
+ replication_test<lowres_clock>(test_body, false, true, tick_delta); } \
SEASTAR_THREAD_TEST_CASE(test_name ## _prevote) { \
- replication_test(test_body, true, false, tick_delta); } \
+ replication_test<lowres_clock>(test_body, true, false, tick_delta); } \
SEASTAR_THREAD_TEST_CASE(test_name ## _prevote_drops) { \
- replication_test(test_body, true, true, tick_delta); }
+ replication_test<lowres_clock>(test_body, true, true, tick_delta); }

// 1 nodes, simple replication, empty, no updates
RAFT_TEST_CASE(simple_replication, (test_case{
@@ -201,7 +201,7 @@ RAFT_TEST_CASE(drops_04_dueling_repro, (test_case{

// TODO: change to RAFT_TEST_CASE once it's stable for handling packet drops
SEASTAR_THREAD_TEST_CASE(test_take_snapshot_and_stream) {
- replication_test(
+ replication_test<lowres_clock>(
// Snapshot automatic take and load
{.nodes = 3,
.config = {{.snapshot_threshold = 10, .snapshot_trailing = 5}},
@@ -223,7 +223,7 @@ RAFT_TEST_CASE(conf_changes_2, (test_case{

// Check removing a node from configuration, adding entries; cycle for all combinations
SEASTAR_THREAD_TEST_CASE(remove_node_cycle) {
- replication_test(
+ replication_test<lowres_clock>(
{.nodes = 4,
.updates = {set_config{0,1,2}, entries{2}, new_leader{1},
set_config{1,2,3}, entries{2}, new_leader{2},
@@ -235,7 +235,7 @@ SEASTAR_THREAD_TEST_CASE(remove_node_cycle) {
}

SEASTAR_THREAD_TEST_CASE(test_leader_change_during_snapshot_transfere) {
- replication_test(
+ replication_test<lowres_clock>(
{.nodes = 3,
.initial_snapshots = {{.snap = {.idx = raft::index_t(10),
.term = raft::term_t(1),
@@ -547,4 +547,3 @@ RAFT_TEST_CASE(rpc_configuration_truncate_restore_from_log, (test_case{
check_rpc_config{{node_id{0},node_id{1},node_id{2}},
rpc_address_set{node_id{0},node_id{1},node_id{2}}},
}}));
-
--
2.31.1

Alejo Sanchez

unread,
Jul 23, 2021, 5:15:10 AMJul 23
to scylla...@googlegroups.com, kos...@scylladb.com, gl...@scylladb.com, tgra...@scylladb.com, Alejo Sanchez
Support disconnection of a pair of nodes or one with the rest.

Signed-off-by: Alejo Sanchez <alejo....@scylladb.com>
---
test/raft/replication.hh | 34 +++++++++++++++++++++++++++-------
1 file changed, 27 insertions(+), 7 deletions(-)

diff --git a/test/raft/replication.hh b/test/raft/replication.hh
index 35f647f4e6..d3645d31e1 100644
--- a/test/raft/replication.hh
+++ b/test/raft/replication.hh
@@ -142,12 +142,18 @@ struct leader {
};
using partition = std::vector<std::variant<leader,int>>;

+
+// Disconnect a node from the rest
+struct disconnect1 {
+ size_t id;
+};
+
// Disconnect 2 servers both ways
struct two_nodes {
size_t first;
size_t second;
};
-struct disconnect : public two_nodes {};
+struct disconnect2 : public two_nodes {};

struct stop {
size_t id;
@@ -213,9 +219,9 @@ struct tick {
uint64_t ticks;
};

-using update = std::variant<entries, new_leader, partition, disconnect, stop, reset, wait_log,
- set_config, check_rpc_config, check_rpc_added, check_rpc_removed, rpc_reset_counters,
- tick>;
+using update = std::variant<entries, new_leader, partition, disconnect1, disconnect2,
+ stop, reset, wait_log, set_config, check_rpc_config, check_rpc_added,
+ check_rpc_removed, rpc_reset_counters, tick>;

struct log_entry {
unsigned term;
@@ -346,7 +352,8 @@ class raft_cluster {
future<> tick(::tick t);
future<> stop(::stop server);
future<> reset(::reset server);
- void disconnect(::disconnect nodes);
+ void disconnect(::disconnect2 nodes);
+ future<> disconnect(::disconnect1 nodes);
void verify();
private:
test_server create_server(size_t id, initial_state state);
@@ -1154,10 +1161,20 @@ future<> raft_cluster<Clock>::reset(::reset server) {
}

template <typename Clock>
-void raft_cluster<Clock>::disconnect(::disconnect nodes) {
+void raft_cluster<Clock>::disconnect(::disconnect2 nodes) {
_connected->cut(to_raft_id(nodes.first), to_raft_id(nodes.second));
}

+template <typename Clock>
+future<> raft_cluster<Clock>::disconnect(::disconnect1 node) {
+ _connected->disconnect(to_raft_id(node.id));
+ if (node.id == _leader) {
+ _servers[_leader].server->elapse_election(); // make old leader step down
+ co_await free_election();
+ }
+ co_return;
+}
+
template <typename Clock>
void raft_cluster<Clock>::verify() {
BOOST_TEST_MESSAGE("Verifying hashes match expected (snapshot and apply calls)");
@@ -1232,10 +1249,13 @@ struct run_test {
[&rafts] (new_leader update) -> future<> {
co_await rafts.elect_new_leader(update.id);
},
- [&rafts] (disconnect update) -> future<> {
+ [&rafts] (disconnect2 update) -> future<> {
rafts.disconnect(update);
co_return;
},
+ [&rafts] (::disconnect1 update) -> future<> {
+ co_await rafts.disconnect(update);
+ },
[&rafts] (partition update) -> future<> {

Alejo Sanchez

unread,
Jul 23, 2021, 5:15:12 AMJul 23
to scylla...@googlegroups.com, kos...@scylladb.com, gl...@scylladb.com, tgra...@scylladb.com, Alejo Sanchez
Allow specifying ranges within partition to handle large number of
nodes.

Signed-off-by: Alejo Sanchez <alejo....@scylladb.com>
---
test/raft/replication.hh | 18 ++++++++++++++----
1 file changed, 14 insertions(+), 4 deletions(-)

diff --git a/test/raft/replication.hh b/test/raft/replication.hh
index d3645d31e1..6a48c8de2b 100644
--- a/test/raft/replication.hh
+++ b/test/raft/replication.hh
@@ -140,7 +140,12 @@ struct new_leader {
struct leader {
size_t id;
};
-using partition = std::vector<std::variant<leader,int>>;
+// Inclusive range
+struct range {
+ size_t start;
+ size_t end;
+};
+using partition = std::vector<std::variant<leader,range,int>>;


// Disconnect a node from the rest
@@ -1104,11 +1109,16 @@ future<> raft_cluster<Clock>::partition(::partition p) {
size_t id;
if (std::holds_alternative<struct leader>(s)) {
next_leader = std::get<struct leader>(s).id;
- id = *next_leader;
+ partition_servers.insert(*next_leader);
+ } else if (std::holds_alternative<struct range>(s)) {
+ auto range = std::get<struct range>(s);
+ for (size_t id = range.start; id <= range.end; id++) {
+ assert(id < _servers.size());
+ partition_servers.insert(id);
+ }
} else {
- id = std::get<int>(s);
+ partition_servers.insert(std::get<int>(s));
}
- partition_servers.insert(id);
}
if (next_leader) {
// Wait for log to propagate to next leader, before disconnections
--
2.31.1

Alejo Sanchez

unread,
Jul 23, 2021, 5:15:13 AMJul 23
to scylla...@googlegroups.com, kos...@scylladb.com, gl...@scylladb.com, tgra...@scylladb.com, Alejo Sanchez
Fix restart tickers for partition.

Signed-off-by: Alejo Sanchez <alejo....@scylladb.com>
---
test/raft/replication.hh | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/test/raft/replication.hh b/test/raft/replication.hh
index 6a48c8de2b..0aa2651689 100644
--- a/test/raft/replication.hh
+++ b/test/raft/replication.hh
@@ -1145,9 +1145,11 @@ future<> raft_cluster<Clock>::partition(::partition p) {
co_await elect_new_leader(*next_leader);
} else if (partition_servers.find(_leader) == partition_servers.end() && p.size() > 0) {
// Old leader disconnected and not specified new, free election
+ restart_tickers();
co_await free_election();
+ } else {
+ restart_tickers();
}
- restart_tickers();
}

template <typename Clock>
--
2.31.1

Alejo Sanchez

unread,
Jul 23, 2021, 5:15:14 AMJul 23
to scylla...@googlegroups.com, kos...@scylladb.com, gl...@scylladb.com, tgra...@scylladb.com, Alejo Sanchez
seastar lowres_clock minimum granularity is 10ms, not 1ms.

Signed-off-by: Alejo Sanchez <alejo....@scylladb.com>
---
test/raft/replication_test.cc | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/test/raft/replication_test.cc b/test/raft/replication_test.cc
index b407ff9ad0..a506c3f2be 100644
--- a/test/raft/replication_test.cc
+++ b/test/raft/replication_test.cc
@@ -24,7 +24,7 @@
// Test Raft library with declarative test definitions


-lowres_clock::duration tick_delta = 1ms;
+lowres_clock::duration tick_delta = 10ms; // minimum granularity of lowres_clock

Alejo Sanchez

unread,
Jul 23, 2021, 5:15:16 AMJul 23
to scylla...@googlegroups.com, kos...@scylladb.com, gl...@scylladb.com, tgra...@scylladb.com, Alejo Sanchez
Allow test supplied delays for rpc communication.

Allow supplying network delay, local delay (nodes within the same
server), how many nodes are local, and an extra small delay simulating
local load.

Use a special rpc class. This class no longer directly calls the other
node's server code but it schedules it to be called later. This makes
the test more realistic as in the previous version the first candidate
was always going to get to all followers first (no dueling).

Previously tickers were all scheduled at the same time, so there was no
spread of them across the tick time. Now these tickers are scheduled
with a uniform spread across this time (tick delta).

For custom free elections use normal ticks and check every tick delay if
there is a new leader instead of ticking manually. This is more
realistic.

There are a couple more debugging lines including one needed to avoid a
misscompilation for the delays not being passed properly.

Signed-off-by: Alejo Sanchez <alejo....@scylladb.com>
---
test/raft/replication.hh | 228 ++++++++++++++++++++++++++++++++++-----
1 file changed, 201 insertions(+), 27 deletions(-)

diff --git a/test/raft/replication.hh b/test/raft/replication.hh
index 0aa2651689..492df29bf4 100644
--- a/test/raft/replication.hh
+++ b/test/raft/replication.hh
@@ -22,7 +22,9 @@
#pragma once

#include <random>
+#include <bit>
#include <seastar/core/app-template.hh>
+#include <seastar/core/gate.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/core/loop.hh>
@@ -287,6 +289,13 @@ raft::snapshot_id delay_apply_snapshot{utils::UUID(0, 0xdeadbeaf)};
// sending of a snaphot with that id will be delayed until snapshot_sync is signaled
raft::snapshot_id delay_send_snapshot{utils::UUID(0xdeadbeaf, 0)};

+struct delays {
+ std::chrono::milliseconds network_delay; // should be a fraction of a tick
+ std::chrono::milliseconds local_delay; // same host latency
+ size_t local_nodes = 32; // i.e. cores per server (must be power of 2)
+ size_t extra_delay_max = 500; // extra randomized per rpc delay (us)
+};
+
template <typename Clock>
class raft_cluster {
using apply_fn = std::function<size_t(raft::server_id id, const std::vector<raft::command_cref>& commands, lw_shared_ptr<hasher_int> hasher)>;
@@ -295,6 +304,7 @@ class raft_cluster {
class connected;
class failure_detector;
class rpc;
+ class rpc_with_delays;
struct test_server {
std::unique_ptr<raft::server> server;
state_machine* sm;
@@ -314,12 +324,18 @@ class raft_cluster {
size_t _leader;
std::vector<initial_state> get_states(test_case test, bool prevote);
typename Clock::duration _tick_delta;
+ std::optional<struct delays> _delays;
+ // Tick phase delay for each node, uniformly spread across tick delta
+ std::optional<std::vector<typename Clock::duration>> _tick_delays;
+ std::optional<std::uniform_int_distribution<int>> _rpc_extra_delay_dist;
+ std::mt19937 _rpc_extra_delay_gen;
public:
raft_cluster(test_case test,
apply_fn apply,
size_t apply_entries, size_t first_val, size_t first_leader,
bool prevote, bool packet_drops,
- typename Clock::duration tick_delta);
+ typename Clock::duration tick_delta,
+ std::optional<struct delays> delays = std::nullopt);
// No copy
raft_cluster(const raft_cluster&) = delete;
raft_cluster(raft_cluster&&) = default;
@@ -337,11 +353,12 @@ class raft_cluster {
void elapse_elections();
future<> elect_new_leader(size_t new_leader);
future<> free_election();
- void init_raft_tickers();
+ future<> init_raft_tickers();
void pause_tickers();
- void restart_tickers();
+ future<> restart_tickers();
void cancel_ticker(size_t id);
void set_ticker_callback(size_t id) noexcept;
+ void init_tick_delays();
future<> add_entries(size_t n);
future<> add_remaining_entries();
future<> wait_log(size_t follower);
@@ -522,6 +539,7 @@ class raft_cluster<Clock>::failure_detector : public raft::failure_detector {

template <typename Clock>
class raft_cluster<Clock>::rpc : public raft::rpc {
+protected:
static std::unordered_map<raft::server_id, rpc*> net;
raft::server_id _id;
connected* _connected;
@@ -627,6 +645,110 @@ class raft_cluster<Clock>::rpc : public raft::rpc {
uint32_t servers_removed() const {
return _servers_removed;
}
+ friend raft_cluster<Clock>::rpc_with_delays;
+};
+
+template <typename Clock>
+class raft_cluster<Clock>::rpc_with_delays : public raft_cluster<Clock>::rpc {
+ // Used to ensure that when `abort()` returns there are
+ // no more in-progress methods running on this object.
+ seastar::gate _gate;
+ struct delays& _delays;
+ std::uniform_int_distribution<int>& _dist;
+ std::mt19937& _gen;
+ // prefix mask for nodes (shards) per server
+ uint64_t _local_mask;
+
+public:
+ rpc_with_delays(raft::server_id id, connected* connected, snapshots* snapshots,
+ bool packet_drops, struct delays& delays,
+ std::uniform_int_distribution<int>& dist,
+ std::mt19937& gen) :
+ rpc(id, connected, snapshots, packet_drops),
+ _delays(delays), _dist(dist), _gen(gen)
+ {
+ rpc::net[rpc::_id] = this;
+ // Rounds to next power of 2
+ _local_mask = std::bit_width(_delays.local_nodes);
+ }
+ typename Clock::duration get_delay(raft::server_id id) {
+ if ((to_local_id(rpc::_id.id) & _local_mask) == (to_local_id(id.id) & _local_mask)) {
+ return _delays.local_delay;
+ } else {
+ return _delays.network_delay;
+ }
+ }
+
+ auto rand_extra_delay() {
+ return _dist(_gen) * 1us;
+ }
+
+ virtual future<> send_append_entries(raft::server_id id, const raft::append_request& append_request) {
+ if (!rpc::net.count(id)) {
+ return make_exception_future(std::runtime_error("trying to send a message to an unknown node"));
+ }
+ if (!(*rpc::_connected)(id, rpc::_id)) {
+ return make_exception_future<>(std::runtime_error("cannot send append since nodes are disconnected"));
+ }
+ if (!rpc::_packet_drops || (rand() % 5)) {
+ return with_gate(_gate, [&, this] () mutable -> future<> {
+ return seastar::sleep(get_delay(id) + rand_extra_delay()).then(
+ [this, id = std::move(id), append_request = std::move(append_request)] {
+ rpc::net[id]->_client->append_entries(rpc::_id, append_request);
+ });
+ });
+ }
+ return make_ready_future<>();
+ }
+ virtual void send_append_entries_reply(raft::server_id id, const raft::append_reply& reply) {
+ if (!rpc::net.count(id)) {
+ return;
+ }
+ if (!(*rpc::_connected)(id, rpc::_id)) {
+ return;
+ }
+ if (!rpc::_packet_drops || (rand() % 5)) {
+ (void)with_gate(_gate, [&, this] () mutable -> future<> {
+ return seastar::sleep(get_delay(id) + rand_extra_delay()).then(
+ [this, id = std::move(id), reply = std::move(reply)] {
+ rpc::net[id]->_client->append_entries_reply(rpc::_id, std::move(reply));
+ });
+ });
+ }
+ }
+ virtual void send_vote_request(raft::server_id id, const raft::vote_request& vote_request) {
+ if (!rpc::net.count(id)) {
+ return;
+ }
+ if (!(*rpc::_connected)(id, rpc::_id)) {
+ return;
+ }
+ (void)with_gate(_gate, [&, this] () mutable -> future<> {
+ auto extra_delay = rand_extra_delay();
+ return seastar::sleep(get_delay(id) + extra_delay).then(
+ [this, id = std::move(id), vote_request = std::move(vote_request)] {
+ rpc::net[id]->_client->request_vote(rpc::_id, std::move(vote_request));
+ });
+ });
+ }
+ virtual void send_vote_reply(raft::server_id id, const raft::vote_reply& vote_reply) {
+ if (!rpc::net.count(id)) {
+ return;
+ }
+ if (!(*rpc::_connected)(id, rpc::_id)) {
+ return;
+ }
+ (void)with_gate(_gate, [&, this] () mutable -> future<> {
+ auto extra_delay = rand_extra_delay();
+ return seastar::sleep(get_delay(id) + extra_delay).then([=, this] {
+ rpc::net[id]->_client->request_vote_reply(rpc::_id, vote_reply);
+ });
+ });
+ }
+
+ virtual future<> abort() {
+ return _gate.close();
+ }
};

template <typename Clock>
@@ -639,8 +761,15 @@ typename raft_cluster<Clock>::test_server raft_cluster<Clock>::create_server(siz
auto sm = std::make_unique<state_machine>(uuid, _apply, _apply_entries, _snapshots.get());
auto& rsm = *sm;

- auto mrpc = std::make_unique<raft_cluster::rpc>(uuid, _connected.get(),
- _snapshots.get(), _packet_drops);
+ std::unique_ptr<raft_cluster::rpc> mrpc;
+ if (_tick_delays) {
+ mrpc = std::make_unique<raft_cluster::rpc_with_delays>(uuid, _connected.get(),
+ _snapshots.get(), _packet_drops, *_delays,
+ *_rpc_extra_delay_dist, _rpc_extra_delay_gen);
+ } else {
+ mrpc = std::make_unique<raft_cluster::rpc>(uuid, _connected.get(),
+ _snapshots.get(), _packet_drops);
+ }
auto& rpc_ref = *mrpc;

auto mpersistence = std::make_unique<persistence>(uuid, state,
@@ -662,7 +791,8 @@ raft_cluster<Clock>::raft_cluster(test_case test,
apply_fn apply,
size_t apply_entries, size_t first_val, size_t first_leader,
bool prevote, bool packet_drops,
- typename Clock::duration tick_delta) :
+ typename Clock::duration tick_delta,
+ std::optional<struct delays> delays) :
_connected(std::make_unique<struct connected>(test.nodes)),
_snapshots(std::make_unique<snapshots>()),
_persisted_snapshots(std::make_unique<persisted_snapshots>()),
@@ -672,7 +802,8 @@ raft_cluster<Clock>::raft_cluster(test_case test,
_prevote(prevote),
_apply(apply),
_leader(first_leader),
- _tick_delta(tick_delta) {
+ _tick_delta(tick_delta),
+ _delays(delays) {

rpc::reset_network();

@@ -688,6 +819,11 @@ raft_cluster<Clock>::raft_cluster(test_case test,
config.current.emplace(states[i].address);
}

+ if (_delays) {
+ init_tick_delays();
+ _rpc_extra_delay_dist = std::uniform_int_distribution<int>(0, (*_delays).extra_delay_max);
+ }
+
for (size_t i = 0; i < states.size(); i++) {
auto& s = states[i].address;
states[i].snapshot.config = config;
@@ -696,6 +832,19 @@ raft_cluster<Clock>::raft_cluster(test_case test,
}
}

+template <typename Clock>
+void raft_cluster<Clock>::init_tick_delays() {
+ std::uniform_int_distribution<size_t> dist(0, _tick_delta.count());
+ auto gen = random_generator();
+
+ _tick_delays = {};
+ (*_tick_delays).reserve(_servers.size());
+ for (size_t s = 0; s < _servers.size(); s++) {
+ auto delay = dist(gen);
+ (*_tick_delays).push_back(delay * _tick_delta / _tick_delta.count());
+ }
+}
+
template <typename Clock>
future<> raft_cluster<Clock>::stop_server(size_t id) {
cancel_ticker(id);
@@ -717,7 +866,7 @@ future<> raft_cluster<Clock>::start_all() {
co_await parallel_for_each(_servers, [] (auto& r) {
return r.server->start();
});
- init_raft_tickers();
+ co_await init_raft_tickers();
BOOST_TEST_MESSAGE("Electing first leader " << _leader);
_servers[_leader].server->wait_until_candidate();
co_await _servers[_leader].server->wait_election_done();
@@ -780,14 +929,21 @@ future<> raft_cluster<Clock>::add_remaining_entries() {
}

template <typename Clock>
-void raft_cluster<Clock>::init_raft_tickers() {
+future<> raft_cluster<Clock>::init_raft_tickers() {
_tickers.resize(_servers.size());
// Only start tickers for servers in configuration
for (auto s: _in_configuration) {
- _tickers[s].arm_periodic(_tick_delta);
_tickers[s].set_callback([&, s] {
_servers[s].server->tick();
});
+ if (_tick_delays) {
+ co_await parallel_for_each(_in_configuration, [&] (size_t s) -> future<> {
+ co_await seastar::sleep((*_tick_delays)[s]);
+ _tickers[s].rearm_periodic(_tick_delta);
+ });
+ } else {
+ _tickers[s].arm_periodic(_tick_delta);
+ }
}
}

@@ -799,9 +955,16 @@ void raft_cluster<Clock>::pause_tickers() {
}

template <typename Clock>
-void raft_cluster<Clock>::restart_tickers() {
- for (auto s: _in_configuration) {
- _tickers[s].rearm_periodic(_tick_delta);
+future<> raft_cluster<Clock>::restart_tickers() {
+ if (_tick_delays) {
+ co_await parallel_for_each(_in_configuration, [&] (size_t s) -> future<> {
+ co_await seastar::sleep((*_tick_delays)[s]);
+ _tickers[s].rearm_periodic(_tick_delta);
+ });
+ } else {
+ for (auto s: _in_configuration) {
+ _tickers[s].rearm_periodic(_tick_delta);
+ }
}
}

@@ -938,7 +1101,7 @@ future<> raft_cluster<Clock>::elect_new_leader(size_t new_leader) {

// Restore connections to the original setting
*_connected = prev_disconnected;
- restart_tickers();
+ co_await restart_tickers();
co_await wait_log_all();

} else { // not prevote
@@ -962,7 +1125,7 @@ future<> raft_cluster<Clock>::elect_new_leader(size_t new_leader) {
_connected->connect(to_raft_id(_leader));
// Disconnect old leader from all nodes except new leader
_connected->disconnect(to_raft_id(_leader), to_raft_id(new_leader));
- restart_tickers();
+ co_await restart_tickers();
co_await _servers[new_leader].server->wait_election_done();

// Restore connections to the original setting
@@ -979,15 +1142,14 @@ future<> raft_cluster<Clock>::elect_new_leader(size_t new_leader) {
template <typename Clock>
future<> raft_cluster<Clock>::free_election() {
tlogger.debug("Running free election");
- elapse_elections();
size_t node = 0;
- for (;;) {
- tick_all();
- co_await seastar::sleep(10us); // Wait for election rpc exchanges
+ size_t loops = 0;
+ for (;; loops++) {
+ co_await seastar::sleep(_tick_delta); // Wait for election rpc exchanges
// find if we have a leader
for (auto s: _in_configuration) {
if (_servers[s].server->is_leader()) {
- tlogger.debug("New leader {}", s);
+ tlogger.debug("New leader {} (in {} loops)", to_raft_id(s), loops);
_leader = s;
co_return;
}
@@ -1023,6 +1185,10 @@ future<> raft_cluster<Clock>::change_configuration(set_config sc) {
if (!_in_configuration.contains(s)) {
tlogger.debug("Starting node being re-added to configuration {}", s);
co_await reset_server(s, initial_state{.log = {}});
+
+ if (_tick_delays) {
+ co_await seastar::sleep((*_tick_delays)[s]);
+ }
_tickers[s].rearm_periodic(_tick_delta);
}
}
@@ -1103,6 +1269,7 @@ future<> raft_cluster<Clock>::reconfigure_all() {

template <typename Clock>
future<> raft_cluster<Clock>::partition(::partition p) {
+ tlogger.debug("partitioning");
std::unordered_set<size_t> partition_servers;
std::optional<size_t> next_leader;
for (auto s: p) {
@@ -1142,13 +1309,14 @@ future<> raft_cluster<Clock>::partition(::partition p) {
}
if (next_leader) {
// New leader specified, elect it
- co_await elect_new_leader(*next_leader);
+ co_await elect_new_leader(*next_leader); // restarts tickers
} else if (partition_servers.find(_leader) == partition_servers.end() && p.size() > 0) {
// Old leader disconnected and not specified new, free election
- restart_tickers();
+ co_await restart_tickers();
+ _servers[_leader].server->elapse_election(); // make old leader step down
co_await free_election();
} else {
- restart_tickers();
+ co_await restart_tickers();
}
}

@@ -1179,6 +1347,7 @@ void raft_cluster<Clock>::disconnect(::disconnect2 nodes) {

template <typename Clock>
future<> raft_cluster<Clock>::disconnect(::disconnect1 node) {
+ tlogger.debug("disconnecting {}", to_raft_id(node.id));
_connected->disconnect(to_raft_id(node.id));
if (node.id == _leader) {
_servers[_leader].server->elapse_election(); // make old leader step down
@@ -1243,11 +1412,14 @@ std::vector<initial_state> raft_cluster<Clock>::get_states(test_case test, bool
template <typename Clock>
struct run_test {
future<> operator() (test_case test, bool prevote, bool packet_drops,
- typename Clock::duration tick_delta) {
+ typename Clock::duration tick_delta,
+ std::optional<struct delays> delays = std::nullopt) {

+ // Log line needed to avoid misscompilation not sending delays in next call
+ tlogger.debug("starting test with {}", delays? "delays" : "no delays");
raft_cluster<Clock> rafts(test, ::apply_changes, test.total_values,
test.get_first_val(), test.initial_leader, prevote, packet_drops,
- tick_delta);
+ tick_delta, delays);
co_await rafts.start_all();

BOOST_TEST_MESSAGE("Processing updates");
@@ -1324,6 +1496,8 @@ struct run_test {

template <typename Clock>
void replication_test(struct test_case test, bool prevote, bool packet_drops,
- typename Clock::duration tick_delta) {
- run_test<Clock>{}(std::move(test), prevote, packet_drops, tick_delta).get();
+ typename Clock::duration tick_delta,
+ std::optional<struct delays> delays = std::nullopt) {
+ run_test<Clock>{}(std::move(test), prevote, packet_drops,
+ std::move(tick_delta), std::move(delays)).get();
}
--
2.31.1

Alejo Sanchez

unread,
Jul 23, 2021, 5:15:18 AMJul 23
to scylla...@googlegroups.com, kos...@scylladb.com, gl...@scylladb.com, tgra...@scylladb.com, Alejo Sanchez
Tests now wait for normal ticks for election, remove deprecated tick_all
helper.

Signed-off-by: Alejo Sanchez <alejo....@scylladb.com>
---
test/raft/replication.hh | 8 --------
1 file changed, 8 deletions(-)

diff --git a/test/raft/replication.hh b/test/raft/replication.hh
index 492df29bf4..7d7f6cdbfd 100644
--- a/test/raft/replication.hh
+++ b/test/raft/replication.hh
@@ -347,7 +347,6 @@ class raft_cluster {
future<> start_all();
future<> stop_all();
future<> wait_all();
- void tick_all();
void disconnect(size_t id, std::optional<raft::server_id> except = std::nullopt);
void connect_all();
void elapse_elections();
@@ -886,13 +885,6 @@ future<> raft_cluster<Clock>::wait_all() {
}
}

-template <typename Clock>
-void raft_cluster<Clock>::tick_all() {
- for (auto s: _in_configuration) {
- _servers[s].server->tick();
- }
-}
-
template <typename Clock>
void raft_cluster<Clock>::disconnect(size_t id, std::optional<raft::server_id> except) {
_connected->disconnect(to_raft_id(id), except);
--
2.31.1

Alejo Sanchez

unread,
Jul 23, 2021, 5:15:20 AMJul 23
to scylla...@googlegroups.com, kos...@scylladb.com, gl...@scylladb.com, tgra...@scylladb.com, Alejo Sanchez
Tests with many nodes and realistic timers and ticks.

Network delays are kept as a fraction of ticks. (e.g. 20/100)

Tests with 600 or more nodes hang in debug mode.

Signed-off-by: Alejo Sanchez <alejo....@scylladb.com>
---
configure.py | 2 ++
test/raft/many_test.cc | 52 ++++++++++++++++++++++++++++++++++++++++++
2 files changed, 54 insertions(+)
create mode 100644 test/raft/many_test.cc

diff --git a/configure.py b/configure.py
index 4b9c369339..0b6fed1b68 100755
--- a/configure.py
+++ b/configure.py
@@ -553,6 +553,7 @@ perf_tests = set([
raft_tests = set([
'test/raft/replication_test',
'test/raft/randomized_nemesis_test',
+ 'test/raft/many_test',
'test/raft/fsm_test',
'test/raft/etcd_test',
'test/raft/raft_sys_table_storage_test',
@@ -1254,6 +1255,7 @@ deps['test/boost/alternator_unit_test'] += ['alternator/base64.cc']

deps['test/raft/replication_test'] = ['test/raft/replication_test.cc', 'test/raft/helpers.cc'] + scylla_raft_dependencies
deps['test/raft/randomized_nemesis_test'] = ['test/raft/randomized_nemesis_test.cc'] + scylla_raft_dependencies
+deps['test/raft/many_test'] = ['test/raft/many_test.cc', 'test/raft/helpers.cc'] + scylla_raft_dependencies
deps['test/raft/fsm_test'] = ['test/raft/fsm_test.cc', 'test/raft/helpers.cc', 'test/lib/log.cc'] + scylla_raft_dependencies
deps['test/raft/etcd_test'] = ['test/raft/etcd_test.cc', 'test/raft/helpers.cc', 'test/lib/log.cc'] + scylla_raft_dependencies
deps['test/raft/raft_sys_table_storage_test'] = ['test/raft/raft_sys_table_storage_test.cc'] + \
diff --git a/test/raft/many_test.cc b/test/raft/many_test.cc
new file mode 100644
index 0000000000..309deb5010
--- /dev/null
+++ b/test/raft/many_test.cc
@@ -0,0 +1,52 @@
+/*
+ * Copyright (C) 2021-present ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+// Test Raft library with many candidates
+//
+// Using slower but precise clock
+
+#include "replication.hh"
+
+using update = std::variant<entries, new_leader, partition, disconnect1, disconnect2,
+ stop, reset, wait_log, set_config, check_rpc_config, check_rpc_added,
+ check_rpc_removed, rpc_reset_counters, tick>;
+
+SEASTAR_THREAD_TEST_CASE(test_many_100) {
+ replication_test<steady_clock_type>(
+ {.nodes = 100, .total_values = 10,
+ .updates = {entries{1},
+ disconnect1{0}, // drop leader, free election
+ entries{2},
+ }}
+ , true, false, 100ms,
+ delays{ .network_delay = 20ms, .local_delay = 1ms });
+}
+
+SEASTAR_THREAD_TEST_CASE(test_many_400) {
+ replication_test<steady_clock_type>(
+ {.nodes = 400, .total_values = 10,
+ .updates = {entries{1},
+ disconnect1{0}, // drop leader, free election
+ entries{2},
+ }}
+ , true, false, 100ms,
+ delays{ .network_delay = 20ms, .local_delay = 1ms });
+}
--
2.31.1

Konstantin Osipov

unread,
Jul 23, 2021, 11:10:02 AMJul 23
to Alejo Sanchez, scylla...@googlegroups.com, gl...@scylladb.com, tgra...@scylladb.com
* Alejo Sanchez <alejo....@scylladb.com> [21/07/23 12:16]:
> Support disconnection of a pair of nodes or one with the rest.

I'd call it something like isolate(node_id) and partition(node_id,
node_id), disconnect1 and disconnect2 suggest that disconnect2 is
the same as disconnect1 just for two nodes.


--
Konstantin Osipov, Moscow, Russia

Konstantin Osipov

unread,
Jul 23, 2021, 11:12:11 AMJul 23
to Alejo Sanchez, scylla...@googlegroups.com, gl...@scylladb.com, tgra...@scylladb.com
* Alejo Sanchez <alejo....@scylladb.com> [21/07/23 12:16]:
> Fix restart tickers for partition.

How did it work before? Why did the test pass even though
free_election was called without restarting tickers?

Konstantin Osipov

unread,
Jul 23, 2021, 11:52:40 AMJul 23
to Alejo Sanchez, scylla...@googlegroups.com, gl...@scylladb.com, tgra...@scylladb.com
* Alejo Sanchez <alejo....@scylladb.com> [21/07/23 12:16]:
> Allow test supplied delays for rpc communication.
>
> Allow supplying network delay, local delay (nodes within the same
> server), how many nodes are local, and an extra small delay simulating
> local load.
>
> Use a special rpc class. This class no longer directly calls the other
> node's server code but it schedules it to be called later. This makes
> the test more realistic as in the previous version the first candidate
> was always going to get to all followers first (no dueling).
>
> Previously tickers were all scheduled at the same time, so there was no
> spread of them across the tick time. Now these tickers are scheduled
> with a uniform spread across this time (tick delta).
>
> For custom free elections use normal ticks and check every tick delay if
> there is a new leader instead of ticking manually. This is more
> realistic.

I didn't quite understand this part, why "checking every tick
delay if there is a new leader" is "more realistic".

> There are a couple more debugging lines including one needed to avoid a
> misscompilation for the delays not being passed properly.

Passed where?

> raft::snapshot_id delay_send_snapshot{utils::UUID(0xdeadbeaf, 0)};
>
> +struct delays {

I would appreciate a comment for every class.

> + std::chrono::milliseconds network_delay; // should be a fraction of a tick
> + std::chrono::milliseconds local_delay; // same host latency

Should it be a fraction of a tick?

> + size_t local_nodes = 32; // i.e. cores per server (must be power of 2)

Why?

> + size_t extra_delay_max = 500; // extra randomized per rpc delay (us)

Please clarify if extra_delay_max is what is randomized or the extra delay
is a random value within range 0...extra_delay_max.

> + std::optional<struct delays> _delays;

struct delays is not exactly the delays themselves, it's a
configuration for delays. Do you have some guidance for how to use it?

> + if (!rpc::_packet_drops || (rand() % 5)) {

Shoudln't 5 (20% chance) be part of config?

> + virtual void send_vote_request(raft::server_id id, const raft::vote_request& vote_request) {
> + if (!rpc::net.count(id)) {
> + return;
> + }
> + if (!(*rpc::_connected)(id, rpc::_id)) {
> + return;
> + }
> + (void)with_gate(_gate, [&, this] () mutable -> future<> {

Hate to bloat the scope of the patch, but vote_request/reply is not
subject to packed drops, is there a specific reason for it?

> + _tick_delta(tick_delta),
> + _delays(delays) {

Is there a reason to keep delays separate, perhaps put everything
into rpc_config?

> + if (_tick_delays) {
> + co_await parallel_for_each(_in_configuration, [&] (size_t s) -> future<> {
> + co_await seastar::sleep((*_tick_delays)[s]);
> + _tickers[s].rearm_periodic(_tick_delta);
> + });
> + } else {
> + _tickers[s].arm_periodic(_tick_delta);
> + }

Perhaps you could avoid branching and optionals just
seastar::sleep(0), but up to you.

> @@ -799,9 +955,16 @@ void raft_cluster<Clock>::pause_tickers() {
> }
>
> template <typename Clock>
> -void raft_cluster<Clock>::restart_tickers() {
> - for (auto s: _in_configuration) {
> - _tickers[s].rearm_periodic(_tick_delta);
> +future<> raft_cluster<Clock>::restart_tickers() {
> + if (_tick_delays) {
> + co_await parallel_for_each(_in_configuration, [&] (size_t s) -> future<> {
> + co_await seastar::sleep((*_tick_delays)[s]);
> + _tickers[s].rearm_periodic(_tick_delta);
> + });
> + } else {
> + for (auto s: _in_configuration) {
> + _tickers[s].rearm_periodic(_tick_delta);
> + }
> }
> @@ -1023,6 +1185,10 @@ future<> raft_cluster<Clock>::change_configuration(set_config sc) {
> if (!_in_configuration.contains(s)) {
> tlogger.debug("Starting node being re-added to configuration {}", s);
> co_await reset_server(s, initial_state{.log = {}});
> +
> + if (_tick_delays) {
> + co_await seastar::sleep((*_tick_delays)[s]);

I think if we look at each node's delay as at a random walk, the dispersion
between different nodes should grow with more ticks. Basically there is
a growing chance some (with more ticks) some node will tick much
slower than the majority of the nodes. Did you log the total
injected delay per node, does it fluctuate around median value or
walks away from it?

Konstantin Osipov

unread,
Jul 23, 2021, 11:54:00 AMJul 23
to Alejo Sanchez, scylla...@googlegroups.com, gl...@scylladb.com, tgra...@scylladb.com
* Alejo Sanchez <alejo....@scylladb.com> [21/07/23 12:16]:
> Tests with many nodes and realistic timers and ticks.
>
> Network delays are kept as a fraction of ticks. (e.g. 20/100)
>
> Tests with 600 or more nodes hang in debug mode.

Could the accumulating difference between node tick speeds be the
culprit?

Alejo Sanchez

unread,
Jul 23, 2021, 1:16:34 PMJul 23
to Konstantin Osipov, Alejo Sanchez, scylladb-dev, Gleb Natapov, Tomasz Grabiec
On Fri, Jul 23, 2021 at 5:12 PM Konstantin Osipov <kos...@scylladb.com> wrote:
* Alejo Sanchez <alejo....@scylladb.com> [21/07/23 12:16]:
> Fix restart tickers for partition.

How did it work before? Why did the test pass even though
free_election was called without restarting tickers?

I will reword the commit message.

Before it was always restarting tickers. But elect_new_leader() already does it.

Alejo Sanchez

unread,
Jul 23, 2021, 1:59:59 PMJul 23
to Konstantin Osipov, Alejo Sanchez, scylladb-dev, Gleb Natapov, Tomasz Grabiec
On Fri, Jul 23, 2021 at 5:53 PM Konstantin Osipov <kos...@scylladb.com> wrote:
* Alejo Sanchez <alejo....@scylladb.com> [21/07/23 12:16]:
> Tests with many nodes and realistic timers and ticks.
>
> Network delays are kept as a fraction of ticks. (e.g. 20/100)
>
> Tests with 600 or more nodes hang in debug mode.

Could the accumulating difference between node tick speeds be the
culprit?

For 100ms tick delay (normal, used in test) in debug mode the test always hangs with 700 nodes.
For 200ms it sometimes hangs for 700 nodes and always hangs for 900 nodes.
Note the space doubled so ticker density was halved.

For 500ms it sometimes hangs for 900 nodes.

Also it's quite non-linear so I don't think it's because of an overloaded engine/scheduler.


Alejo Sanchez

unread,
Jul 23, 2021, 2:16:14 PMJul 23
to Konstantin Osipov, Alejo Sanchez, scylladb-dev, Gleb Natapov, Tomasz Grabiec
With the patch for prevote follower backoff the test passes for tick 100ms and 700.
Combined with non-uniform candidate timeouts the test always passes until ~800 nodes.

Note when you test above 800 you see some reactor stalls but those are before the test (likely setup) and after (likely cleanup).
Not related to the free election.

Alejo Sanchez

unread,
Jul 26, 2021, 9:01:42 AMJul 26
to Konstantin Osipov, Alejo Sanchez, scylladb-dev, Gleb Natapov, Tomasz Grabiec
On Fri, Jul 23, 2021 at 5:52 PM Konstantin Osipov <kos...@scylladb.com> wrote:
* Alejo Sanchez <alejo....@scylladb.com> [21/07/23 12:16]:
> @@ -1023,6 +1185,10 @@ future<> raft_cluster<Clock>::change_configuration(set_config sc) {
>          if (!_in_configuration.contains(s)) {
>              tlogger.debug("Starting node being re-added to configuration {}", s);
>              co_await reset_server(s, initial_state{.log = {}});
> +
> +            if (_tick_delays) {
> +                co_await seastar::sleep((*_tick_delays)[s]);

I think if we look at each node's delay as at a random walk, the dispersion
between different nodes should grow with more ticks. Basically there is
a growing chance some (with more ticks) some node will tick much
slower than the majority of the nodes. Did you log the total
injected delay per node, does it fluctuate around median value or
walks away from it? 

Ticks are very regular. I tried different things and never got servers more than ~1 tick more/less from the average.

I also tried adding 200 entries in multiple waves.

Also tried with 10s sleep afterwards to see if Seastar itself would drift, but no:

debug mode sleep 10s
n 100: ticks per server: average 115.00, min 115, max 115
n 400: ticks per server: average 148.19, min 148, max 149
n 500: ticks per server: average 208.99, min 209, max 210

dev mode sleep 10s
n 100: ticks per server: average 112.00, min 112, max 112
n 400: ticks per server: average 114.00, min 114, max 114
n 500: ticks per server: average 114.00, min 114, max 114

Alejo Sanchez

unread,
Jul 26, 2021, 11:32:49 AMJul 26
to scylla...@googlegroups.com, kos...@scylladb.com, gl...@scylladb.com, tgra...@scylladb.com, Alejo Sanchez
Factor out replication test, make it work with different clocks, add
some features, and add a many nodes test with steady_clock. Also
refactor common test helper.

Many nodes test passes with 600 nodes but with higher numbers it needs
prevote follower backoff and non-uniform candidate timeouts.

Branch URL: https://github.com/alecco/scylla/tree/raft-many-13-many-nodes-test

Tests: unit ({dev}), unit ({debug}), unit ({release})

Changes in v3:
- rename disconnect1 to isolate(node_id) @kostja
- reworded commit for restart_tickers in partitioning @kostja
- improve commit message for tick delays @kostja
- use struct rpc_config to pass test connectivity config @kostja
- no delays with network delay 0 instead of optional @kostja
- make dummy command const
- functions defined in replication.cc file
- helper for packet drops

Changes in v2:
- ticker delays
- rpc delays
- fix restart_tickers in partitioning
- fix minimum granularity in replication_test
- partition ranges
- remove unused tick_all()
- two tests in many nodes
- more logging
- rebase on latest master

Alejo Sanchez (16):
raft: log election stages
raft: testing: refactor helper
raft: replication test: move common code out
raft: replication test: tick delta inside raft_cluster
raft: replication test: template clock type
raft: replication test: make dummy command const
raft: replication test: move objects out of header
raft: replication test: isolate one server
raft: replication test: partition ranges
raft: replication test: fix restart_tickers when partitioning
raft: replication test: use minimum granularity
raft: replication test: connectivity configuration
raft: replication test: packet drop rpc helper
raft: replication test: delays
raft: replication test: remove unused tick_all
raft: testing: many nodes test

configure.py | 8 +-
raft/fsm.cc | 4 +
test/raft/etcd_test.cc | 2 +
test/raft/fsm_test.cc | 2 +
test/raft/{helpers.hh => helpers.cc} | 107 +-
test/raft/helpers.hh | 145 +-
test/raft/many_test.cc | 48 +
test/raft/replication.cc | 101 ++
.../{replication_test.cc => replication.hh} | 1167 ++++++----------
test/raft/replication_test.cc | 1241 +----------------
10 files changed, 627 insertions(+), 2198 deletions(-)
copy test/raft/{helpers.hh => helpers.cc} (63%)
create mode 100644 test/raft/many_test.cc
create mode 100644 test/raft/replication.cc
copy test/raft/{replication_test.cc => replication.hh} (51%)

--
2.31.1

Alejo Sanchez

unread,
Jul 26, 2021, 11:32:50 AMJul 26
to scylla...@googlegroups.com, kos...@scylladb.com, gl...@scylladb.com, tgra...@scylladb.com, Alejo Sanchez
Add logging for election tracing.

Signed-off-by: Alejo Sanchez <alejo....@scylladb.com>
---

Alejo Sanchez

unread,
Jul 26, 2021, 11:32:52 AMJul 26
to scylla...@googlegroups.com, kos...@scylladb.com, gl...@scylladb.com, tgra...@scylladb.com, Alejo Sanchez
Move definitions to helper object file.

Signed-off-by: Alejo Sanchez <alejo....@scylladb.com>
---
configure.py | 6 +-
test/raft/etcd_test.cc | 2 +
test/raft/fsm_test.cc | 2 +
test/raft/{helpers.hh => helpers.cc} | 107 ++------------------
test/raft/helpers.hh | 145 +++++----------------------
5 files changed, 39 insertions(+), 223 deletions(-)
copy test/raft/{helpers.hh => helpers.cc} (63%)

diff --git a/configure.py b/configure.py
index 94a2913d71..4b9c369339 100755
--- a/configure.py
+++ b/configure.py
@@ -1252,10 +1252,10 @@ deps['test/boost/linearizing_input_stream_test'] = [
deps['test/boost/duration_test'] += ['test/lib/exception_utils.cc']
deps['test/boost/alternator_unit_test'] += ['alternator/base64.cc']

-deps['test/raft/replication_test'] = ['test/raft/replication_test.cc'] + scylla_raft_dependencies
+deps['test/raft/replication_test'] = ['test/raft/replication_test.cc', 'test/raft/helpers.cc'] + scylla_raft_dependencies
deps['test/raft/randomized_nemesis_test'] = ['test/raft/randomized_nemesis_test.cc'] + scylla_raft_dependencies
-deps['test/raft/fsm_test'] = ['test/raft/fsm_test.cc', 'test/lib/log.cc'] + scylla_raft_dependencies
-deps['test/raft/etcd_test'] = ['test/raft/etcd_test.cc', 'test/lib/log.cc'] + scylla_raft_dependencies
+deps['test/raft/fsm_test'] = ['test/raft/fsm_test.cc', 'test/raft/helpers.cc', 'test/lib/log.cc'] + scylla_raft_dependencies
+deps['test/raft/etcd_test'] = ['test/raft/etcd_test.cc', 'test/raft/helpers.cc', 'test/lib/log.cc'] + scylla_raft_dependencies
deps['test/raft/raft_sys_table_storage_test'] = ['test/raft/raft_sys_table_storage_test.cc'] + \
scylla_core + scylla_tests_generic_dependencies
deps['test/raft/raft_address_map_test'] = ['test/raft/raft_address_map_test.cc'] + scylla_core
diff --git a/test/raft/etcd_test.cc b/test/raft/etcd_test.cc
index 872ba8ccf0..c051eb595f 100644
--- a/test/raft/etcd_test.cc
+++ b/test/raft/etcd_test.cc
@@ -35,6 +35,8 @@

// Port of etcd Raft implementation unit tests

+#define BOOST_TEST_MODULE raft
+
#include "test/raft/helpers.hh"

using namespace raft;
diff --git a/test/raft/fsm_test.cc b/test/raft/fsm_test.cc
index 299eb0a0d4..d9a8f4e1ae 100644
--- a/test/raft/fsm_test.cc
+++ b/test/raft/fsm_test.cc
@@ -19,6 +19,8 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
- }
- }
- }
- }
- } while (has_traffic);
-}
+communicate_impl(std::function<bool()> stop_pred, raft_routing_map& map);

template <typename... Args>
void communicate_until(std::function<bool()> stop_pred, Args&&... args) {
@@ -212,44 +140,19 @@ raft::fsm* select_leader(Args&&... args) {
}


-raft::server_id id() {
- static int id = 0;
- return raft::server_id{utils::UUID(0, ++id)};
-}
-
-raft::server_address_set address_set(std::vector<raft::server_id> ids) {
- raft::server_address_set set;

Alejo Sanchez

unread,
Jul 26, 2021, 11:32:55 AMJul 26
to scylla...@googlegroups.com, kos...@scylladb.com, gl...@scylladb.com, tgra...@scylladb.com, Alejo Sanchez
Common replication test code moved to header.

Signed-off-by: Alejo Sanchez <alejo....@scylladb.com>
---
.../{replication_test.cc => replication.hh} | 525 +------
test/raft/replication_test.cc | 1221 +----------------
2 files changed, 3 insertions(+), 1743 deletions(-)
copy test/raft/{replication_test.cc => replication.hh} (64%)

diff --git a/test/raft/replication_test.cc b/test/raft/replication.hh
similarity index 64%
copy from test/raft/replication_test.cc
copy to test/raft/replication.hh
index 9e86821bea..902c990880 100644
--- a/test/raft/replication_test.cc
+++ b/test/raft/replication.hh
@@ -19,6 +19,8 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/

+#pragma once
+
#include <random>
#include <seastar/core/app-template.hh>
#include <seastar/core/sleep.hh>
@@ -1241,526 +1243,3 @@ future<> run_test(test_case test, bool prevote, bool packet_drops) {
void replication_test(struct test_case test, bool prevote, bool packet_drops) {
run_test(std::move(test), prevote, packet_drops).get();
}
-
-#define RAFT_TEST_CASE(test_name, test_body) \
- SEASTAR_THREAD_TEST_CASE(test_name) { \
- replication_test(test_body, false, false); } \
-// TODO: change to RAFT_TEST_CASE once it's stable for handling packet drops
-SEASTAR_THREAD_TEST_CASE(test_take_snapshot_and_stream) {
- replication_test(
- // Snapshot automatic take and load
- {.nodes = 3,
- .config = {{.snapshot_threshold = 10, .snapshot_trailing = 5}},
- .updates = {entries{5}, partition{0,1}, entries{10}, partition{0, 2}, entries{20}}}
- , false, false);
- .updates = {tick{10} /* ticking starts snapshot transfer */, new_leader{1}, entries{10}}}
- , false, false);
- .snapshot = {.config = address_set({node_id{0},node_id{1},node_id{2},node_id{3}})
- }
- }},
-
- // A should observe RPC configuration = {A, B, C, D} since it's the union
- // of an uncommitted joint config components
- // {.current = {A, B, C, D}, .previous = {A, B, C}}.
- check_rpc_config{node_id{0},
- rpc_address_set{node_id{0},node_id{1},node_id{2},node_id{3}}},
-
- // Elect B as leader
- new_leader{1},
-
- // Heal network partition. Connect all.
- partition{0,1,2,3},
-
- // wait to synchronize logs between current leader (B) and the rest of the cluster
- wait_log{0,2},
-
- // A's RPC configuration is reverted to committed configuration {A, B, C}.
- check_rpc_config{{node_id{0},node_id{1},node_id{2}},
- rpc_address_set{node_id{0},node_id{1},node_id{2}}},
- }}));
-
diff --git a/test/raft/replication_test.cc b/test/raft/replication_test.cc
index 9e86821bea..4bdbb133ce 100644
--- a/test/raft/replication_test.cc
+++ b/test/raft/replication_test.cc
@@ -19,1228 +19,9 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/

-#include <random>
-#include <seastar/core/app-template.hh>
-#include <seastar/core/sleep.hh>
-#include <seastar/core/coroutine.hh>
-#include <seastar/core/loop.hh>
-#include <seastar/util/log.hh>
-#include <seastar/util/later.hh>
-#include <seastar/util/variant_utils.hh>
-#include <seastar/testing/random.hh>
-#include <seastar/testing/thread_test_case.hh>
-#include <seastar/testing/test_case.hh>
-#include "raft/server.hh"
-#include "serializer.hh"
-#include "serializer_impl.hh"
-#include "xx_hasher.hh"
-#include "test/raft/helpers.hh"
-#include "test/lib/eventually.hh"
+#include "replication.hh"

// Test Raft library with declarative test definitions
-static seastar::logger tlogger("test");
-
-lowres_clock::duration tick_delta = 1ms;
-}
-
-raft::server_address_set address_set(std::vector<node_id> nodes) {
- return address_set(to_raft_id_vec(nodes));
-}
-
-// Updates can be
-// - Entries
-// - Leader change
-// - Configuration change
-struct entries {
- size_t n;
-};
-struct new_leader {
- size_t id;
-};
-struct leader {
- size_t id;
-};
-using partition = std::vector<std::variant<leader,int>>;
-using update = std::variant<entries, new_leader, partition, disconnect, stop, reset, wait_log,
- set_config, check_rpc_config, check_rpc_added, check_rpc_removed, rpc_reset_counters,
- tick>;
- std::unordered_set<size_t> _in_configuration; // Servers in current configuration
- std::vector<seastar::timer<lowres_clock>> _tickers;
- size_t _leader;
- std::vector<initial_state> get_states(test_case test, bool prevote);
-public:
- raft_cluster(test_case test,
- apply_fn apply,
- size_t apply_en