[PATCH v2 0/5] test: raft: generators infrastructure with an actual random nemesis test

13 views
Skip to first unread message

Kamil Braun

<kbraun@scylladb.com>
unread,
Jul 7, 2021, 5:43:37 AM7/7/21
to scylladb-dev@googlegroups.com, gleb@scylladb.com, kostja@scylladb.com, Kamil Braun
The Jepsen-inspired randomized nemesis test is here.

We introduce the concepts of "operations" and "generators", basic
building blocks that will allow us to declaratively write randomized
tests for torturing simulated Raft clusters.

An "operation" is a data structure representing a computation which
may cause side effects such as calling a Raft cluster or partitioning
the network, represented in the code with the `Executable` concept.
It has an `execute` function performing the computation and returns
a result of type `result_type`. Different computations of the same type
share state of type `state_type`. The state can, for example, contain
database handles.

Each execution is performed on an abstract `thread' (represented by a `thread_id`)
and has a logical starting time point. The thread and start point together form
the execution's `context` which is passed as a reference to `execute`.

Two operations may be called in parallel only if they are on different threads.

A generator, represented through the `Generator` concept, produces a
sequence of operations. An operation can be fetched from a generator
using the `op` function, which also returns the next state of the
generator (generators are purely functional data structures).

The generator concept is inspired by the generators in the Jepsen
testing library for distributed systems.

We also implement `interpreter` which "interprets", or "runs", a given
generator, by fetching operations from the generator and executing them
with concurrency controlled by the abstract threads.

The algorithm used in the interpreter is also similar to the interpreter
algorithm in Jepsen, although there are differences. Most notably we don't
have a "worker" concept - everything runs on a single shard; but we use
"abstract threads" combined with futures for concurrency.
There is also no notion of "process". Finally, the interpreter doesn't
keep an explicit history, but instead uses a callback `Recorder` to notify
the user about operation invocations and completions. The user can
decide to save these events in a history, or perhaps they can analyze
them on the fly using constant memory.

Operations and generators can be composed to create more complex
operations and generators. There are certain composition patterns useful
for many different test scenarios.

We implement a couple of such patterns. For example:
- Given multiple different operation types, we can create a new
operation type - `either_of` - which is a "union" of the original
operation types. Executing `either_of` operation means executing an
operation of one of the original types, but the specific type
can be chosen in runtime.
- Given a generator `g`, `op_limit(n, g)` is a new generator which
limits the number of operations produced by `g`.
- Given a generator `g` and a time duration of `d` ticks, `stagger(g, d)` is a
new generator which spreads the operations from `g` roughly every `d`
ticks. (The actual definition in code is more general and complex but
the idea is similar.)

Some of these patterns have correspodning notions in Jepsen, e.g. our
`stagger` has a corresponding `stagger` in Jepsen (although our
`stagger` is more general).

Finally, we implement a test that uses this new infrastructure.

Two `Executable` operations are implemented:
- `raft_call` is for calling to a Raft cluster with a given state
machine command,
- `network_majority_grudge` partitions the network in half,
putting the leader in the minority.

We run a workload of these operations against a cluster of 5 nodes with
6 threads for executing the operations: one "nemesis thread" for
`network_majority_grudge` and 5 "client threads" for `raft_call`.
Each client thread randomly chooses a contact point which it tries first
when executing a `raft_call`, but it can also "bounce" - call a
different server when the previous returned "not_a_leader" (we use the
generic "bouncing" wrapper to do this).

For now we only print the resulting history. In a follow-up patchset
we will analyze it for consistency anomalies.

Kamil Braun (5):
test: raft: randomized_nemesis_test: handle `raft::stopped_error` in
timeout futures
test: raft: introduce `future_set`
test: raft: introduce generators
test: raft: generator: a library of basic generators
test: raft: randomized_nemesis_test: a basic generator test

test/raft/future_set.hh | 105 ++++
test/raft/generator.hh | 844 +++++++++++++++++++++++++++
test/raft/randomized_nemesis_test.cc | 341 ++++++++++-
3 files changed, 1287 insertions(+), 3 deletions(-)
create mode 100644 test/raft/future_set.hh
create mode 100644 test/raft/generator.hh

---

Requires the "randomized nemesis test improvements" patchset.

GIT URL: https://github.com/kbr-/scylla/commits/raft-test-generator-v3

v2:
- new `sequence` generator in the "generators library" commit (see the comment)
- remove OpFactory concept, use std::invocable + invoke_result in the
basic generators (so I can use lambdas when defining generators in tests)
- the basic generators (random, constant, sequence) store their
operation factories in an `assignable` wrapper that implements
operator= for move/copy-assignable types (useful for e.g. lambdas)
- rename `op` to more expressive `fetch_op`
- store `std::exception_ptr` instead of `std::exception` in
`operation::exceptional_result` (so we don't slice the type)
- `either_of` is now a `struct` with public members - in later commits
it turns out I needed to access the `op` field sometimes
- similarly for `raft_call`
- fix a bug in `stagger_gen` - it would use a constant value for the delta
instead of randomly sampling it
- `raft_call` is a bit simpler, there is no `thread_state`, for now the
contact point is chosen randomly
- use a random seed in the test, a specific seed can be provided
using `random-seed` flag
- use lambdas to generate operations instead of special factory types
(`make_majority_grudge`, `make_exchange`...)
- a generic `bouncing` wrapper - previously there was the more specific `bouncing_call`.
`bouncing` is now used for calls only, but it will be also used for reconfiguration
requests later.

--
2.31.1

Kamil Braun

<kbraun@scylladb.com>
unread,
Jul 7, 2021, 5:43:37 AM7/7/21
to scylladb-dev@googlegroups.com, gleb@scylladb.com, kostja@scylladb.com, Kamil Braun
The timeout futures in `call` and `reconfigure` may be discarded after
Raft servers were `abort()`ed which would result in
`raft::stopped_error` and the test complained about discarded
exceptional futures. Discard these errors explicitly.
---
test/raft/randomized_nemesis_test.cc | 18 +++++++++++++++---
1 file changed, 15 insertions(+), 3 deletions(-)

diff --git a/test/raft/randomized_nemesis_test.cc b/test/raft/randomized_nemesis_test.cc
index edd93bd32..ce4e16522 100644
--- a/test/raft/randomized_nemesis_test.cc
+++ b/test/raft/randomized_nemesis_test.cc
@@ -251,8 +251,14 @@ future<call_result_t<M>> call(
return make_ready_future<call_result_t<M>>(e);
} catch (logical_timer::timed_out<typename M::output_t> e) {
(void)e.get_future().discard_result()
- .handle_exception_type([] (const output_channel_dropped&) {})
- .handle_exception_type([] (const raft::dropped_entry&) {});
+ .handle_exception([] (std::exception_ptr eptr) {
+ try {
+ std::rethrow_exception(eptr);
+ } catch (const output_channel_dropped&) {
+ } catch (const raft::dropped_entry&) {
+ } catch (const raft::stopped_error&) {
+ }
+ });
return make_ready_future<call_result_t<M>>(timed_out_error{});
}
});
@@ -828,7 +834,13 @@ future<reconfigure_result_t> reconfigure(
co_return e;
} catch (logical_timer::timed_out<void> e) {
(void)e.get_future().discard_result()
- .handle_exception_type([] (const raft::dropped_entry&) {});
+ .handle_exception([] (std::exception_ptr eptr) {
+ try {
+ std::rethrow_exception(eptr);
+ } catch (const raft::dropped_entry&) {
+ } catch (const raft::stopped_error&) {
+ }
+ });
co_return timed_out_error{};
}
}
--
2.31.1

Kamil Braun

<kbraun@scylladb.com>
unread,
Jul 7, 2021, 5:43:38 AM7/7/21
to scylladb-dev@googlegroups.com, gleb@scylladb.com, kostja@scylladb.com, Kamil Braun
A set of futures that can be polled.

Polling the set (`poll` function) returns the value of one of
the futures which became available or `std::nullopt` if the given
logical durationd passes (according to the given timer), whichever
event happens first. The current implementation assumes sequential
polling.

New futures can be added to the set with `add`.
All futures can be removed from the set with `release`.
---
test/raft/future_set.hh | 105 ++++++++++++++++++++++++++++++++++++++++
1 file changed, 105 insertions(+)
create mode 100644 test/raft/future_set.hh

diff --git a/test/raft/future_set.hh b/test/raft/future_set.hh
new file mode 100644
index 000000000..e7fe0091a
--- /dev/null
+++ b/test/raft/future_set.hh
@@ -0,0 +1,105 @@
+/*
+ * Copyright (C) 2021 ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <seastar/core/coroutine.hh>
+#include <seastar/core/weak_ptr.hh>
+#include <seastar/core/condition-variable.hh>
+
+using namespace seastar;
+
+// A set of futures that can be polled to obtain the result of some ready future in the set.
+//
+// Note: the set must be empty on destruction. Call `release` to ensure emptiness.
+template <typename T>
+class future_set {
+ struct cond_var_container : public seastar::weakly_referencable<cond_var_container> {
+ seastar::condition_variable v;
+ };
+
+ std::vector<future<T>> _futures;
+ cond_var_container _container;
+
+public:
+ // Polling the set returns the value of one of the futures which became available
+ // or `std::nullopt` if the logical duration `d` passes (according to `timer`),
+ // whichever event happens first.
+ //
+ // Cannot be called in parallel.
+ // TODO: we could probably lift this restriction by using `broadcast()` instead of `signal()`. Think about it.
+ future<std::optional<T>> poll(logical_timer& timer, raft::logical_clock::duration d) {
+ auto timeout = timer.now() + d;
+
+ auto wake_condition = [this, &timer, timeout] {
+ return std::any_of(_futures.begin(), _futures.end(), std::mem_fn(&future<T>::available)) || timer.now() >= timeout;
+ };
+
+ if (timer.now() < timeout) { // i.e. d > 0
+ // Wake ourselves up when the timeout passes (if we're still waiting at that point).
+ // If nothing else wakes us, this will.
+ timer.schedule(timeout, [ptr = _container.weak_from_this()] {
+ if (ptr) {
+ ptr->v.signal();
+ }
+ });
+
+ co_await _container.v.wait(wake_condition);
+ }
+
+ assert(wake_condition());
+
+ for (auto& f : _futures) {
+ if (f.available()) {
+ std::swap(f, _futures.back());
+ auto ff = std::move(_futures.back());
+ _futures.pop_back();
+ co_return std::move(ff).get();
+ }
+ }
+
+ // No future was available, so `wake_condition()` implies:
+ assert(timer.now() >= timeout);
+ co_return std::nullopt;
+ }
+
+ void add(future<T> f) {
+ _futures.push_back(std::move(f).finally([ptr = _container.weak_from_this()] {
+ if (ptr) {
+ ptr->v.signal();
+ }
+ }));
+ }
+
+ // Removes all futures from the set and returns them (even if they are not ready yet).
+ // The user must ensure that there are no futures in the set when it's destroyed; this is a good way to do so.
+ std::vector<future<T>> release() {
+ return std::exchange(_futures, {});
+ }
+
+ bool empty() const {
+ return _futures.empty();
+ }
+
+ ~future_set() {
+ assert(_futures.empty());
+ }
+};
--
2.31.1

Kamil Braun

<kbraun@scylladb.com>
unread,
Jul 7, 2021, 5:43:39 AM7/7/21
to scylladb-dev@googlegroups.com, gleb@scylladb.com, kostja@scylladb.com, Kamil Braun
---
test/raft/generator.hh | 436 +++++++++++++++++++++++++++++++++++++++++
1 file changed, 436 insertions(+)
create mode 100644 test/raft/generator.hh

diff --git a/test/raft/generator.hh b/test/raft/generator.hh
new file mode 100644
index 000000000..e4004b9e8
--- /dev/null
+++ b/test/raft/generator.hh
@@ -0,0 +1,436 @@
+/*
+ * Copyright (C) 2021-present ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <unordered_set>
+
+#include <seastar/util/variant_utils.hh>
+#include "utils/chunked_vector.hh"
+
+#include "test/raft/future_set.hh"
+
+namespace operation {
+
+// Operation execution thread identifier.
+// See `Executable` context.
+struct thread_id {
+ bool operator==(const thread_id&) const = default;
+ size_t id;
+};
+
+} // namespace operation
+
+namespace std {
+template<> struct hash<operation::thread_id> {
+ size_t operator()(const operation::thread_id& tid) const {
+ return hash<size_t>()(tid.id);
+ }
+};
+} // namespace std
+
+namespace operation {
+
+using thread_set = std::unordered_set<thread_id>;
+
+thread_id some(const thread_set& s) {
+ assert(!s.empty());
+ return *s.begin();
+}
+
+// Make a set of `n` threads.
+thread_set make_thread_set(size_t n) {
+ thread_set s;
+ for (size_t i = 0; i < n; ++i) {
+ s.insert(thread_id{i});
+ }
+ return s;
+}
+
+// Operation execution context.
+// See `Executable` concept.
+struct context {
+ thread_id thread;
+ raft::logical_clock::time_point start;
+};
+
+// An executable operation.
+// Represents a computation which may cause side effects.
+//
+// The computation is performed through the `execute` function. It returns a result
+// of type `result_type` (for computations that don't have a result, use `std::monostate` or equivalent).
+//
+// Different computations of the same type may share state of type `state_type`; for example,
+// the state may contain database handles. A reference to the state is passed to `execute`.
+//
+// Each execution is performed on an abstract `thread' (represented by a `thread_id`)
+// and has a logical start time point. The thread and start point together form
+// the execution's `context` which is also passed as a reference to `execute`.
+//
+// Two operations may be called in parallel only if they are on different threads.
+template <typename Op>
+concept Executable = requires (Op o, typename Op::state_type& s, const context& ctx) {
+ typename Op::result_type;
+ typename Op::state_type;
+
+ { o.execute(s, ctx) } -> std::same_as<future<typename Op::result_type>>;
+};
+
+// An operation which can specify a logical point in time when it's ready.
+// We will say that such operations `understand time'.
+template <typename Op>
+concept HasReadyTime = requires(Op o) {
+ { o.ready } -> std::same_as<std::optional<raft::logical_clock::time_point>>;
+};
+
+// An operation which can specify a specific thread on which it can be executed.
+// We will say that such operations `understand threads`.
+// If the operation specifies a thread, it won't be executed on any other thread.
+template <typename Op>
+concept HasThread = requires(Op o) {
+ { o.thread } -> std::same_as<std::optional<thread_id>>;
+};
+
+// An operation which finished with an unexpected / unhandled exception.
+// Generally it is recommended to handle any failures in the operation's `execute`
+// function and return them in the result.
+template <typename Op>
+struct exceptional_result {
+ Op op;
+ std::exception_ptr eptr;
+};
+
+template <Executable Op>
+using execute_result = std::variant<typename Op::result_type, exceptional_result<Op>>;
+
+// A completion of an operation's execution.
+// Contains the result, the logical time point when the operation finished
+// and the thread on which the operation was executed.
+template <Executable Op>
+struct completion {
+ execute_result<Op> result;
+ raft::logical_clock::time_point time;
+ thread_id thread;
+};
+
+template <typename Op>
+concept Invocable = Executable<Op> && HasReadyTime<Op> && HasThread<Op>;
+
+} // namespace operation
+
+namespace generator {
+
+// A generator run context.
+// See `Generator` concept.
+struct context {
+ raft::logical_clock::time_point now;
+
+ // free_threads must be a subset of all_threads
+ const operation::thread_set& all_threads;
+ const operation::thread_set& free_threads;
+};
+
+struct pending {};
+struct finished {};
+
+// Possible results for requesting an operation from a generator.
+// See `Generator` concept.
+//
+// A successful fetch results in an operation (`Op`).
+// The generator may also return `pending`, denoting that it doesn't have an operation
+// ready yet but may in the future (e.g. when the time advances or the set of free threads changes),
+// or `finished`, denoting that it has no more operations to return.
+//
+// Note: the order of types in the variant is part of the interface, i.e. the user may depend on it.
+template <typename Op>
+using op_ret = std::variant<Op, pending, finished>;
+
+template <typename Op, typename G>
+using op_and_gen = std::pair<op_ret<Op>, G>;
+
+// A generator is a data structure that produces a sequence of operations.
+// Fetch an operation using the `fetch_op` function.
+// The type of produced operations is `operation_type`.
+//
+// The returned operation depends on the generator's state and a context provided
+// when fetching the operation. The context contains:
+// - A logical time point representing the current time; the time point should
+// be non-decreasing in subsequent `fetch_op` calls.
+// - A set of threads used to execute the fetched operations.
+// - A subset of that set containing the free threads, i.e. threads that are currently
+// not executing any operations.
+//
+// `fetch_op` may assume that there's at least one free thread;
+// the user should not call `fetch_op` otherwise.
+//
+// Other than an operation, the `fetch_op` function may also return `pending` or `finished`.
+// See comment on `op_ret` above for their meaning.
+//
+// Generators are purely functional data structures. When an operation is fetched,
+// it doesn't change the generator's state; instead, it returns a new generator
+// whose state represents the modified state after fetching the operation.
+// This allows one to easily compose generators. The user of a generator may
+// also decide not to use a fetched operation, but instead call `fetch_op` again later
+// on the same generator when the situation changes (e.g. there's a new free thread).
+//
+// TODO: consider using `seastar::lazy_eval` or equivalent for the returned generator.
+template <typename G>
+concept Generator = requires(const G g, const context& ctx) {
+ typename G::operation_type;
+
+ // Fetch an operation.
+ //
+ // The implementation should be deterministic
+ // (it should produce the same operation given the same generator and context).
+ { g.fetch_op(ctx) } -> std::same_as<op_and_gen<typename G::operation_type, G>>;
+};
+
+template <typename G, typename Op>
+concept GeneratorOf = Generator<G> && std::is_same_v<typename G::operation_type, Op>;
+
+// Runs the provided generator, fetching operations from it and executing them on the provided
+// set of threads until the generator is exhausted (returns `finished`).
+//
+// The interpreter assumes an externally ticked `logical_timer`. The time returned from the timer
+// can be used by the interpreted generator. For example, the generator may decide not to return
+// an operation until a later time point.
+//
+// The operation type is an `Invocable` meaning that the generator can specify through the operation's
+// fields the logical time point when the operation is ready (see `HasReadyTime`) or the thread
+// on which the operation can execute (see `HasThread`).
+//
+// The interpreter guarantees the generator the following:
+// - Subsequent calls to `fetch_op` have non-decreasing values of `context::now`.
+// - `context::all_threads` is a constant non-empty set.
+// - `context::free_threads` is a non-empty subset of `context::all_threads`.
+//
+// The interpreter requires the following from the generator:
+// - If the generator returns `pending`, then given a context satisfying `free_threads = all_threads`
+// and large enough `now` it must return an operation.
+// - If the returned operation specifies a thread (the `thread` field is not `nullopt`),
+// the thread must be an element of `free_threads` that was provided in the context passed to `fetch_op`.
+// - If the returned operation specifies a ready time (the `ready` field is not `nullopt`),
+// the time must be greater than `now` that was provided in the context passed to `fetch_op`.
+//
+// Given an operation `Op op`, interpreter guarantees the following to `op.execute(s, ctx)`
+// for `Op::state_type& s` and `operation::context ctx`:
+// - `op.execute` is called only once.
+// - `s` refers to the same object for all `execute` calls (for different operations).
+// - If `op.thread` was not `nullopt`, then `ctx.thread = *op.thread`.
+// - If `op.ready` was not `nullopt`, then `ctx.start >= *op.ready`.
+// - Two executions with the same `ctx.thread` are sequential
+// (the future returned by one of them resolved before the other has started).
+//
+// The interpreter requires `op.execute` to finish eventually.
+//
+// If the generator returns an operation which does not specify `ready` or the specified `ready`
+// satisfies `*ready <= ctx.now`, the interpreter calls `execute` immediately
+// (there is no reactor yield in between). Note: by the above requirements, since the operation
+// was fetched, at least one thread is free; if the operation specified a `thread`,
+// it belongs to the set of free threads; thus it is possible to execute the operation.
+//
+// If the returned `pending` or an operation which specified a `*ready > ctx.now`,
+// the interpreter will retry the fetch later.
+//
+// Given an operation `op`, before calling `execute`, the interpreter records the operation
+// by calling the given `Recorder`. After `execute` finishes, the interpreter records
+// the corresponding completion.
+//
+// When the interpreter run finishes, it is guaranteed that:
+// - all operation executions have completed,
+// - the generator has returned `finished`,
+// - every recorded operation has a corresponding recorded completion.
+template <operation::Invocable Op, GeneratorOf<Op> G, typename Recorder>
+requires std::invocable<Recorder, Op> && std::invocable<Recorder, operation::completion<Op>>
+class interpreter {
+ G _generator;
+
+ const operation::thread_set _all_threads;
+ operation::thread_set _free_threads; // a subset of _all_threads
+
+ raft::logical_clock::duration _poll_timeout;
+ const raft::logical_clock::duration _max_pending_interval;
+
+ future_set<std::pair<operation::execute_result<Op>, operation::thread_id>> _invocations;
+ typename Op::state_type _op_state;
+
+ logical_timer& _timer;
+
+ Recorder _record;
+
+public:
+ // Create an interpreter for the given generator, set of threads, and initial state.
+ // Use `timer` as a source of logical time.
+ //
+ // If the generator returns `pending`, the interpreter will wait for operation completions
+ // for at most `max_pending_interval` before trying to fetch an operation again.
+ // It may retry the fetch earlier if it manages to get a completion in the meantime.
+ //
+ // Operation invocations and completions are recorded using the `record` function.
+ interpreter(G gen, operation::thread_set threads, raft::logical_clock::duration max_pending_interval,
+ typename Op::state_type init_op_state, logical_timer& timer, Recorder record)
+ : _generator(std::move(gen))
+ , _all_threads(threads)
+ , _free_threads(std::move(threads))
+ , _max_pending_interval(max_pending_interval)
+ , _op_state(std::move(init_op_state))
+ , _timer(timer)
+ , _record(std::move(record))
+ {
+ assert(!_all_threads.empty());
+ assert(_max_pending_interval > raft::logical_clock::duration{0});
+ }
+
+ // Run the interpreter and record all operation invocations and completions.
+ // Can only be executed once.
+ future<> run() {
+ while (true) {
+ // If there are any completions we want to record them as soon as possible
+ // so we don't introduce false concurrency for anomaly analyzers
+ // (concurrency makes the problem of looking for anomalies harder).
+ if (auto r = co_await _invocations.poll(_timer, _poll_timeout)) {
+ auto [res, tid] = std::move(*r);
+
+ assert(_all_threads.contains(tid));
+ assert(!_free_threads.contains(tid));
+ _free_threads.insert(tid);
+
+ _record(operation::completion<Op> {
+ .result = std::move(res),
+ .time = _timer.now(),
+ .thread = tid
+ });
+
+ // In the next iteration poll synchronously so if there will be no more completions
+ // to immediately fetch, we move on to the next generator operation.
+ _poll_timeout = raft::logical_clock::duration{0};
+ continue;
+ }
+
+ // No completions to handle at this time.
+ if (_free_threads.empty()) {
+ // No point in fetching an operation from the generator even if there is any;
+ // there is no thread to execute the operation at this time.
+ _poll_timeout = _max_pending_interval;
+ continue;
+ }
+
+ // There is a free thread, check the generator.
+ auto now = _timer.now();
+ auto [op, next_gen] = _generator.fetch_op(context {
+ .now = now,
+ .all_threads = _all_threads,
+ .free_threads = _free_threads,
+ });
+
+ bool stop = false;
+ std::visit(make_visitor(
+ [&stop] (finished) {
+ stop = true;
+ },
+ [this] (pending) {
+ _poll_timeout = _max_pending_interval;
+ },
+ [this, now, next_gen = std::move(next_gen)] (Op op) mutable {
+ if (!op.ready) {
+ op.ready = now;
+ }
+
+ if (now < *op.ready) {
+ _poll_timeout = *op.ready - now;
+ } else {
+ if (!op.thread) {
+ // We ensured !_free_threads.empty() before fetching the operation.
+ op.thread = some(_free_threads);
+ }
+
+ assert(_free_threads.contains(*op.thread));
+ _free_threads.erase(*op.thread);
+
+ _record(op);
+
+ operation::context ctx { .thread = *op.thread, .start = now};
+ _invocations.add(invoke(std::move(op), _op_state, std::move(ctx)));
+
+ _generator = std::move(next_gen);
+ _poll_timeout = raft::logical_clock::duration{0};
+ }
+ }
+ ), std::move(op));
+
+ if (stop) {
+ co_await exit();
+ co_return;
+ }
+ }
+ }
+
+ ~interpreter() {
+ // Ensured by `exit()`.
+ assert(_invocations.empty());
+ }
+
+private:
+ future<> exit() {
+ auto fs = _invocations.release();
+ co_await parallel_for_each(fs, [this] (future<std::pair<operation::execute_result<Op>, operation::thread_id>>& f) -> future<> {
+ auto [res, tid] = co_await std::move(f);
+ _record(operation::completion<Op>{
+ .result = std::move(res),
+ .time = _timer.now(),
+ .thread = tid
+ });
+ });
+ }
+
+ // Not a static method nor free function due to clang bug #50345.
+ future<std::pair<operation::execute_result<Op>, operation::thread_id>>
+ invoke(Op op, typename Op::state_type& op_state, operation::context ctx) {
+ try {
+ co_return std::pair{operation::execute_result<Op>{co_await op.execute(op_state, ctx)}, ctx.thread};
+ // TODO: consider failing in case of unknown/unexpected exception instead of storing it
+ } catch (...) {
+ co_return std::pair{operation::execute_result<Op>{
+ operation::exceptional_result<Op>{std::move(op), std::current_exception()}},
+ ctx.thread};
+ }
+ }
+};
+
+} // namespace generator
+
+namespace operation {
+
+template <typename Op>
+std::ostream& operator<<(std::ostream& os, const exceptional_result<Op>& r) {
+ try {
+ std::rethrow_exception(r.eptr);
+ } catch (const std::exception& e) {
+ return os << format("exceptional{{i:{}, ex:{}}}", r.op, e);
+ }
+}
+
+template <operation::Executable Op>
+std::ostream& operator<<(std::ostream& os, const completion<Op>& c) {
+ return os << format("c{{r:{}, t:{}, tid:{}}}", c.result, c.time, c.thread);
+}
+
+} // namespace operation
--
2.31.1

Kamil Braun

<kbraun@scylladb.com>
unread,
Jul 7, 2021, 5:43:40 AM7/7/21
to scylladb-dev@googlegroups.com, gleb@scylladb.com, kostja@scylladb.com, Kamil Braun
Operations and generators can be composed to create more complex
operations and generators. There are certain composition patterns useful
for many different test scenarios.

This commit introduces a couple of such patterns. For example:
- Given multiple different operation types, we can create a new
operation type - `either_of` - which is a "union" of the original
operation types. Executing `either_of` operation means executing an
operation of one of the original types, but the specific type
can be chosen in runtime.
- Given a generator `g`, `op_limit(n, g)` is a new generator which
limits the number of operations produced by `g`.
- Given a generator `g` and a time duration of `d` ticks, `stagger(g, d)` is a
new generator which spreads the operations from `g` roughly every `d`
ticks. (The actual definition in code is more general and complex but
the idea is similar.)

And so on.

Some of these patterns have correspodning notions in Jepsen, e.g. our
`stagger` has a corresponding `stagger` in Jepsen (although our
`stagger` is more general).
---
test/raft/generator.hh | 408 +++++++++++++++++++++++++++++++++++++++++
1 file changed, 408 insertions(+)

diff --git a/test/raft/generator.hh b/test/raft/generator.hh
index e4004b9e8..2bc50ca09 100644
--- a/test/raft/generator.hh
+++ b/test/raft/generator.hh
@@ -22,6 +22,10 @@
#pragma once

#include <unordered_set>
+#include <random>
+
+#include <boost/mp11/algorithm.hpp>
+#include <boost/implicit_cast.hpp>

#include <seastar/util/variant_utils.hh>
#include "utils/chunked_vector.hh"
@@ -419,6 +423,410 @@ class interpreter {

namespace operation {

+// Given an Executable operation type create an Invocable operation type.
+template <Executable Op>
+struct invocable : public Op {
+ using typename Op::result_type;
+ using typename Op::state_type;
+
+ using Op::execute;
+ using Op::Op;
+
+ std::optional<raft::logical_clock::time_point> ready;
+ std::optional<thread_id> thread;
+};
+
+// Given a non-empty set of Executable operation types, create an Executable operation type representing their union (sum type).
+//
+// The state type of the union is a product of the original state types,
+// i.e. the state of the union contains the states of each operation.
+// The result type is a union of the original result types.
+//
+// An operation of this type is an operation of one of the original types.
+// The state provided to `execute` is a projection of the product state
+// onto the state type corresponding to the original operation type.
+template <Executable... Ops>
+struct either_of {
+ static_assert((std::is_same_v<Ops, Ops> || ...)); // pack is non-empty
+
+ std::variant<Ops...> op;
+
+ using result_type = std::variant<typename Ops::result_type...>;
+ using state_type = std::tuple<typename Ops::state_type...>;
+
+ future<result_type> execute(state_type& s, const context& ctx) {
+ // `co_return co_await` instead of simply `return` to keep the lambda alive during `execute`
+ co_return co_await boost::mp11::mp_with_index<std::tuple_size_v<state_type>>(op.index(),
+ [&s, &ctx, this] (auto I) -> future<result_type> {
+ co_return result_type{co_await std::get<I>(op).execute(std::get<I>(s), ctx)};
+ });
+ }
+
+ template <Executable Op>
+ requires (std::is_same_v<Op, Ops> || ...) // Ops contain Op
+ either_of(Op o) : op(std::move(o)) {
+ static_assert(Executable<either_of<Ops...>>);
+ }
+
+ friend std::ostream& operator<<(std::ostream& os, const either_of& e) {
+ return os << e.op;
+ }
+};
+
+} // namespace operation
+
+// Convert a copy/move constructible type to a copy/move assignable type.
+template <typename T>
+requires std::is_nothrow_move_constructible_v<T>
+class assignable {
+ // Always engaged. Stored in optional for its `emplace` member function.
+ std::optional<T> _v;
+
+public:
+ assignable(T v) : _v(std::move(v)) {}
+ assignable(const assignable&) = default;
+ assignable(assignable&&) = default;
+
+ assignable& operator=(assignable a) {
+ // nothrow_move_constructible guarantees that this does not throw:
+ _v.emplace(std::move(*a._v));
+ return *this;
+ }
+
+ operator T&() { return *_v; }
+ operator const T&() const { return *_v; }
+};
+
+namespace generator {
+
+template <std::invocable<std::mt19937&> F>
+requires std::is_nothrow_move_constructible_v<F>
+struct random_gen {
+ using operation_type = std::invoke_result_t<F, std::mt19937&>;
+
+ op_and_gen<operation_type, random_gen> fetch_op(const context&) const {
+ auto e = _engine;
+ auto r = boost::implicit_cast<F>(_f)(e);
+ return {std::move(r), random_gen{std::move(e), _f}};
+ }
+
+ std::mt19937 _engine;
+ assignable<F> _f;
+};
+
+// Given an operation factory that uses a pseudo-random number generator to produce an operation,
+// creates a generator that produces an infinite random sequence of operations from the factory.
+template <std::invocable<std::mt19937&> F>
+requires std::is_nothrow_move_constructible_v<F>
+random_gen<F> random(int seed, F f) {
+ static_assert(GeneratorOf<random_gen<F>, std::invoke_result_t<F, std::mt19937&>>);
+ return random_gen<F>{std::mt19937{seed}, std::move(f)};
+}
+
+template <std::invocable<> F>
+requires std::is_nothrow_move_constructible_v<F>
+struct constant_gen {
+ using operation_type = std::invoke_result_t<F>;
+
+ op_and_gen<operation_type, constant_gen> fetch_op(const context&) const {
+ return {boost::implicit_cast<F>(_f)(), *this};
+ }
+
+ assignable<F> _f;
+};
+
+// Given an operation factory of no arguments,
+// creates a generator that produces an infinite constant sequence of operations.
+template <std::invocable<> F>
+requires std::is_nothrow_move_constructible_v<F>
+constant_gen<F> constant(F f) {
+ static_assert(GeneratorOf<constant_gen<F>, std::invoke_result_t<F>>);
+ return constant_gen<F>{std::move(f)};
+}
+
+template <std::invocable<int32_t> F>
+requires std::is_nothrow_move_constructible_v<F>
+struct sequence_gen {
+ using operation_type = std::invoke_result_t<F, int32_t>;
+
+ op_and_gen<operation_type, sequence_gen> fetch_op(const context&) const {
+ return {boost::implicit_cast<F>(_f)(_next), sequence_gen{_next + 1, _f}};
+ }
+
+ int32_t _next;
+ assignable<F> _f;
+};
+
+// Given an operation factory using an integer to produce an operation,
+// creates a generator that produces an infinite sequence of operations from the factory
+// by passing consecutive integers starting from `start`.
+template <std::invocable<int32_t> F>
+requires std::is_nothrow_move_constructible_v<F>
+sequence_gen<F> sequence(int32_t start, F f) {
+ static_assert(GeneratorOf<sequence_gen<F>, std::invoke_result_t<F, int32_t>>);
+ return sequence_gen<F>{start, std::move(f)};
+}
+
+template <Generator G>
+struct op_limit_gen {
+ using operation_type = typename G::operation_type;
+
+ op_and_gen<operation_type, op_limit_gen> fetch_op(const context& ctx) const {
+ if (!_remaining) {
+ return {finished{}, *this};
+ }
+
+ auto [op, new_g] = _g.fetch_op(ctx);
+ return {std::move(op), op_limit_gen{_remaining - 1, std::move(new_g)}};
+ }
+
+ size_t _remaining;
+ G _g;
+};
+
+// Given a generator and an operation number `limit`,
+// creates a generator which truncates the sequence of operations returned by the original generator
+// to the first `limit` operations.
+// If the original generator finishes earlier, `op_limit` has no observable effect.
+template <Generator G>
+op_limit_gen<G> op_limit(size_t limit, G g) {
+ static_assert(GeneratorOf<op_limit_gen<G>, typename G::operation_type>);
+ return op_limit_gen<G>{limit, std::move(g)};
+}
+
+template <Generator G, std::predicate<operation::thread_id> P>
+requires operation::HasThread<typename G::operation_type>
+struct on_threads_gen {
+ using operation_type = typename G::operation_type;
+
+ op_and_gen<operation_type, on_threads_gen> fetch_op(const context& ctx) const {
+ operation::thread_set masked_all_threads;
+ masked_all_threads.reserve(ctx.all_threads.size());
+ for (auto& t : ctx.all_threads) {
+ if (_p(t)) {
+ masked_all_threads.insert(t);
+ }
+ }
+
+ if (masked_all_threads.empty()) {
+ return {finished{}, *this};
+ }
+
+ operation::thread_set masked_free_threads;
+ masked_free_threads.reserve(ctx.free_threads.size());
+ for (auto& t : ctx.free_threads) {
+ if (_p(t)) {
+ masked_free_threads.insert(t);
+ }
+ }
+
+ if (masked_free_threads.empty()) {
+ return {pending{}, *this};
+ }
+
+ auto [op, new_g] = _g.fetch_op(context {
+ .now = ctx.now,
+ .all_threads = masked_all_threads,
+ .free_threads = masked_free_threads
+ });
+
+ if (auto i = std::get_if<operation_type>(&op)) {
+ if (i->thread) {
+ assert(masked_free_threads.contains(*i->thread));
+ } else {
+ // The underlying generator didn't assign a thread so we do it.
+ i->thread = some(masked_free_threads);
+ }
+ }
+
+ return {std::move(op), on_threads_gen{_p, std::move(new_g)}};
+ }
+
+ P _p;
+ G _g;
+};
+
+// Given a generator whose operations understand threads and a predicate on threads,
+// creates a generator which produces operations which always specify a thread
+// and the specified threads always satisfy the predicate.
+//
+// Finishes when the original generator finishes or no thread satisfies the predicate.
+// If some thread satisfies the predicate but no thread satisfying the predicate
+// is currently free, returns `pending`.
+//
+// The underlying generator must respect `free_threads`, i.e. it must satisfy the following:
+// if the returned operation specifies a thread, the thread must be one of the `free_threads` passed in the context.
+template <Generator G, std::predicate<operation::thread_id> P>
+requires operation::HasThread<typename G::operation_type>
+on_threads_gen<G, P> on_threads(P p, G g) {
+ static_assert(GeneratorOf<on_threads_gen<G, P>, typename G::operation_type>);
+ return on_threads_gen<G, P>{std::move(p), std::move(g)};
+}
+
+// Compare two generator return values for which one is ``sooner''.
+// Operations are sooner than `pending`, `pending` is sooner than `finished`.
+// For two operations which both specify a ready time, their ready times are compared.
+// If one of them doesn't specify a ready time, we assume it is to be executed ``as soon as possible'',
+// so we assume that its ready time is the current time for comparison purposes (passed in `now`).
+//
+// The return type is `weak_ordering` where smaller represents sooner.
+template <typename Op>
+requires operation::HasReadyTime<Op>
+std::weak_ordering sooner(const op_ret<Op>& r1, const op_ret<Op>& r2, raft::logical_clock::time_point now) {
+ auto op1 = std::get_if<Op>(&r1);
+ auto op2 = std::get_if<Op>(&r2);
+
+ if (op1 && op2) {
+ auto t1 = op1->ready ? *op1->ready : now;
+ auto t2 = op2->ready ? *op2->ready : now;
+
+ return t1 <=> t2;
+ }
+
+ // one or both of them is not an operation (it's pending or finished)
+ // operation is sooner than pending which is sooner than finished
+ // since operation index = 0, pending index = 1, finished index = 2, this works:
+ return r1.index() <=> r2.index();
+}
+
+template <Generator G1, Generator G2>
+requires std::is_same_v<typename G1::operation_type, typename G2::operation_type> && operation::HasReadyTime<typename G1::operation_type>
+struct either_gen {
+ using operation_type = typename G1::operation_type;
+
+ op_and_gen<operation_type, either_gen> fetch_op(const context& ctx) const {
+ auto [op1, new_g1] = _g1.fetch_op(ctx);
+ auto [op2, new_g2] = _g2.fetch_op(ctx);
+
+ auto ord = sooner(op1, op2, ctx.now);
+ bool new_use_g1_on_tie = ord == std::weak_ordering::equivalent ? !_use_g1_on_tie : _use_g1_on_tie;
+
+ if ((ord == std::weak_ordering::equivalent && _use_g1_on_tie) || ord == std::weak_ordering::less) {
+ return {std::move(op1), either_gen{new_use_g1_on_tie, std::move(new_g1), _g2}};
+ }
+
+ return {std::move(op2), either_gen{new_use_g1_on_tie, _g1, std::move(new_g2)}};
+ }
+
+ // Which generator to use when a tie is detected
+ bool _use_g1_on_tie = true;
+
+ G1 _g1;
+ G2 _g2;
+};
+
+// Given two generators producing the same type of operations which understand time,
+// creates a generator whose each produced operation comes from one of the underlying generators.
+//
+// Between operations produced by both generators, chooses the one which is `sooner`.
+// In particular finishes when both generators finish.
+//
+// On the first tie takes an operation from `g1`. On the second and subsequent ties
+// takes an operation from the generator which was not used on the previous tie.
+template <Generator G1, Generator G2>
+requires std::is_same_v<typename G1::operation_type, typename G2::operation_type>
+ && operation::HasReadyTime<typename G1::operation_type>
+either_gen<G1, G2> either(G1 g1, G2 g2) {
+ static_assert(GeneratorOf<either_gen<G1, G2>, typename G1::operation_type>);
+ return either_gen<G1, G2>{._g1{std::move(g1)}, ._g2{std::move(g2)}};
+}
+
+// Given a thread and two generators producing the same type of operations which understand threads and time,
+// creates a generator whose each produced operation comes from one of the underlying generators;
+// furthermore, ensures that all operations from `g1` are assigned to the given thread while all operations
+// from `g2` are assigned to other threads.
+//
+// Ties are resolved as in `either`.
+//
+// Assumes that underlying generators respect `free_threads` (as in `on_threads`).
+template <Generator G1, Generator G2>
+requires std::is_same_v<typename G1::operation_type, typename G2::operation_type>
+ && operation::HasReadyTime<typename G1::operation_type>
+ && operation::HasThread<typename G1::operation_type>
+GeneratorOf<typename G1::operation_type> auto pin(operation::thread_id to, G1 g1, G2 g2) {
+ // Not using lambda because we need the copy constructor
+ struct on_pinned_t {
+ operation::thread_id _to;
+ bool operator()(const operation::thread_id& tid) const { return tid == _to; };
+ } on_pinned {to};
+
+ // Not using std::not_fn for the same reason
+ struct out_of_pinned_t {
+ operation::thread_id _to;
+ bool operator()(const operation::thread_id& tid) const { return tid != _to; };
+ } out_of_pinned {to};
+
+ auto gen = either(
+ on_threads(on_pinned, std::move(g1)),
+ on_threads(out_of_pinned, std::move(g2))
+ );
+
+ static_assert(GeneratorOf<decltype(gen), typename G1::operation_type>);
+ return gen;
+}
+
+template <Generator G>
+requires operation::HasReadyTime<typename G::operation_type>
+struct stagger_gen {
+ using operation_type = typename G::operation_type;
+ using distribution_type = std::uniform_int_distribution<raft::logical_clock::rep>;
+
+ op_and_gen<operation_type, stagger_gen> fetch_op(const context& ctx) const {
+ auto [res, new_g] = _g.fetch_op(ctx);
+
+ if (auto op = std::get_if<operation_type>(&res)) {
+ if (!op->ready || op->ready < _next_ready) {
+ // Need to delay the operation.
+ op->ready = _next_ready;
+ }
+
+ auto e = _engine;
+ distribution_type delta{_delta.param()};
+ auto next_ready = _next_ready + raft::logical_clock::duration{delta(e)};
+ return {std::move(res), stagger_gen{
+ std::move(e), delta,
+ next_ready, std::move(new_g)}
+ };
+ }
+
+ // pending or finished, nothing to do
+ return {std::move(res), *this};
+ }
+
+ std::mt19937 _engine;
+ distribution_type _delta;
+
+ raft::logical_clock::time_point _next_ready;
+ G _g;
+};
+
+// Given a generator whose operations understand time,
+// a logical `start` time point, and an interval of logical time durations [`delta_low`, `delta_high`],
+// produces a sequence of time points where each pair of subsequent time points differs by a time duration
+// chosen in random uniformly from the [`delta_low`, `delta_high`] interval;
+// then, for each operation in the sequence of operations produced by the underlying generator, modifies its `ready`
+// time to be at least as late as the corresponding time point in the above sequence.
+//
+// For example, if we randomly choose the sequence of time points [10, 20, 30], and the `ready` times specified
+// by operations from the underlying generator are [5, `nullopt`, 34], the sequence of operations produced
+// by the new generator will be [10, 20, 34].
+//
+// In particular if `delta = delta_low = delta_high` and the underlying operations never specify `ready`,
+// the produced operations are scheduled to execute every `delta` ticks.
+template <Generator G>
+requires operation::HasReadyTime<typename G::operation_type>
+stagger_gen<G> stagger(
+ int seed, raft::logical_clock::time_point start,
+ raft::logical_clock::duration delta_low, raft::logical_clock::duration delta_high,
+ G g) {
+ static_assert(GeneratorOf<stagger_gen<G>, typename G::operation_type>);
+ return stagger_gen<G>{ ._engine{seed}, ._delta{delta_low.count(), delta_high.count()}, ._next_ready{start}, ._g{std::move(g)} };
+}
+
+} // namespace generator
+
+namespace operation {
+
template <typename Op>
std::ostream& operator<<(std::ostream& os, const exceptional_result<Op>& r) {
try {
--
2.31.1

Kamil Braun

<kbraun@scylladb.com>
unread,
Jul 7, 2021, 5:43:41 AM7/7/21
to scylladb-dev@googlegroups.com, gleb@scylladb.com, kostja@scylladb.com, Kamil Braun
The previous commits introduced basic the generator concept and a
library of most common composition patterns.

In this commit we implement a test that uses this new infrastructure.

Two `Executable` operations are implemented:
- `raft_call` is for calling to a Raft cluster with a given state
machine command,
- `network_majority_grudge` partitions the network in half,
putting the leader in the minority.

We run a workload of these operations against a cluster of 5 nodes with
6 threads for executing the operations: one "nemesis thread" for
`network_majority_grudge` and 5 "client threads" for `raft_call`.
Each client thread randomly chooses a contact point which it tries first
when executing a `raft_call`, but it can also "bounce" - call a
different server when the previous returned "not_a_leader" (we use the
generic "bouncing" wrapper to do this).

For now we only print the resulting history. In a follow-up patchset
we will analyze it for consistency anomalies.
---
test/raft/randomized_nemesis_test.cc | 323 +++++++++++++++++++++++++++
1 file changed, 323 insertions(+)

diff --git a/test/raft/randomized_nemesis_test.cc b/test/raft/randomized_nemesis_test.cc
index ce4e16522..c385ddd68 100644
--- a/test/raft/randomized_nemesis_test.cc
+++ b/test/raft/randomized_nemesis_test.cc
@@ -36,8 +36,12 @@
#include "idl/uuid.dist.hh"
#include "idl/uuid.dist.impl.hh"

+#include "test/lib/random_utils.hh"
#include "test/raft/logical_timer.hh"
#include "test/raft/ticker.hh"
+#include "test/raft/generator.hh"
+
+#include "to_string.hh"

using namespace seastar;
using namespace std::chrono_literals;
@@ -1225,6 +1229,18 @@ namespace ser {

bool operator==(ExReg::ret a, ExReg::ret b) { return a.x == b.x; }

+std::ostream& operator<<(std::ostream& os, const ExReg::ret& r) {
+ return os << format("ret{{{}}}", r.x);
+}
+
+std::ostream& operator<<(std::ostream& os, const ExReg::read&) {
+ return os << "read";
+}
+
+std::ostream& operator<<(std::ostream& os, const ExReg::exchange& e) {
+ return os << format("xng{{{}}}", e.x);
+}
+
// Wait until either one of `nodes` in `env` becomes a leader, or duration `d` expires according to `timer` (whichever happens first).
// If the leader is found, returns it. Otherwise throws a `logical_timer::timed_out` exception.
template <PureStateMachine M>
@@ -1327,3 +1343,310 @@ SEASTAR_TEST_CASE(basic_test) {

tlogger.debug("Finished");
}
+
+// Given a function `F` which takes a `raft::server_id` argument and returns a variant type
+// which contains `not_a_leader`, repeatedly calls `F` until it returns something else than
+// `not_a_leader` or until we reach a limit, whichever happens first.
+// The maximum number of calls until we give up is specified by `bounces`.
+// The initial `raft::server_id` argument provided to `F` is specified as an argument
+// to this function (`srv_id`). If the initial call returns `not_a_leader`, then:
+// - if the result contained a different leader ID, we will use it in the next call,
+// sleeping for `known_leader_delay` first,
+// - otherwise we will take the next ID from the `known` set, sleeping for
+// `unknown_leader_delay` first.
+// The returned result contains the result of the last call to `F` and the last
+// server ID passed to `F`.
+template <typename F>
+struct bouncing {
+ using future_type = std::invoke_result_t<F, raft::server_id>;
+ using value_type = typename future_type::value_type;
+
+ static_assert(boost::mp11::mp_contains<value_type, raft::not_a_leader>::value);
+
+ F _f;
+
+ bouncing(F f) : _f(std::move(f)) {}
+
+ // FIXME: change this into a free function after clang bug #50345 is fixed.
+ future<std::pair<value_type, raft::server_id>> operator()(
+ logical_timer& timer,
+ std::unordered_set<raft::server_id> known,
+ raft::server_id srv_id,
+ size_t bounces,
+ raft::logical_clock::duration known_leader_delay,
+ raft::logical_clock::duration unknown_leader_delay
+ ) {
+ auto it = known.find(srv_id);
+ while (true) {
+ auto res = co_await _f(srv_id);
+
+ if (auto n_a_l = std::get_if<raft::not_a_leader>(&res); n_a_l && bounces) {
+ --bounces;
+ if (n_a_l->leader) {
+ assert(n_a_l->leader != srv_id);
+ co_await timer.sleep(known_leader_delay);
+ srv_id = n_a_l->leader;
+ } else {
+ co_await timer.sleep(unknown_leader_delay);
+ assert(!known.empty());
+ if (it == known.end() || ++it == known.end()) {
+ it = known.begin();
+ }
+ srv_id = *it;
+ }
+ continue;
+ }
+
+ co_return std::pair{res, srv_id};
+ }
+ }
+};
+
+// An operation representing a call to the Raft cluster with a specific state machine input.
+// We may bounce a number of times if the server returns `not_a_leader` before giving up.
+template <PureStateMachine M>
+struct raft_call {
+ typename M::input_t input;
+ raft::logical_clock::duration timeout;
+
+ using result_type = call_result_t<M>;
+
+ struct state_type {
+ environment<M>& env;
+
+ // The set of servers that may be part of the current configuration.
+ // Sometimes we don't know the exact configuration, e.g. after a failed configuration change.
+ const std::unordered_set<raft::server_id>& known;
+
+ logical_timer& timer;
+ };
+
+ future<result_type> execute(state_type& s, const operation::context& ctx) {
+ // TODO a stable contact point used by a given thread would be preferable;
+ // the thread would switch only if necessary (the contact point left the configuration).
+ // Currently we choose the contact point randomly each time.
+ assert(s.known.size() > 0);
+ static std::mt19937 engine{0};
+
+ auto it = s.known.begin();
+ std::advance(it, std::uniform_int_distribution<size_t>{0, s.known.size() - 1}(engine));
+ auto contact = *it;
+
+ tlogger.debug("db call start inp {} tid {} start time {} current time {} contact {}", input, ctx.thread, ctx.start, s.timer.now(), contact);
+
+ auto [res, last] = co_await bouncing{[input = input, timeout = s.timer.now() + timeout, &timer = s.timer, &env = s.env] (raft::server_id id) {
+ return env.get_server(id).call(input, timeout, timer);
+ }}(s.timer, s.known, contact, 6, 10_t, 10_t);
+ tlogger.debug("db call end inp {} tid {} start time {} current time {} last contact {}", input, ctx.thread, ctx.start, s.timer.now(), last);
+
+ co_return res;
+ }
+
+ friend std::ostream& operator<<(std::ostream& os, const raft_call& c) {
+ return os << format("raft_call{{input:{},timeout:{}}}", c.input, c.timeout);
+ }
+};
+
+// An operation that partitions the network in half.
+// During the partition, no server from one half can contact any server from the other;
+// the partition is symmetric.
+// For odd number of nodes, ensures that the current leader (if there is one) is in the minority.
+template <PureStateMachine M>
+class network_majority_grudge {
+ raft::logical_clock::duration _duration;
+
+public:
+ struct state_type {
+ environment<M>& env;
+ const std::unordered_set<raft::server_id>& known;
+ logical_timer& timer;
+ std::mt19937 rnd;
+ };
+
+ using result_type = std::monostate;
+
+ network_majority_grudge(raft::logical_clock::duration d) : _duration(d) {
+ static_assert(operation::Executable<network_majority_grudge<M>>);
+ }
+
+ future<result_type> execute(state_type& s, const operation::context& ctx) {
+ std::vector<raft::server_id> nodes{s.known.begin(), s.known.end()};
+ std::shuffle(nodes.begin(), nodes.end(), s.rnd);
+
+ auto mid = nodes.begin() + (nodes.size() / 2);
+ if (nodes.size() % 2) {
+ // Odd number of nodes, let's ensure that the leader (if there is one) is in the minority
+ auto it = std::find_if(mid, nodes.end(), [&env = s.env] (raft::server_id id) { return env.get_server(id).is_leader(); });
+ if (it != nodes.end()) {
+ std::swap(*nodes.begin(), *it);
+ }
+ }
+
+ // Note: creating the grudges has O(n^2) complexity, where n is the cluster size.
+ // May be problematic for (very) large clusters.
+ for (auto x = nodes.begin(); x != mid; ++x) {
+ for (auto y = mid; y != nodes.end(); ++y) {
+ s.env.get_network().add_grudge(*x, *y);
+ s.env.get_network().add_grudge(*y, *x);
+ }
+ }
+
+ tlogger.debug("network_majority_grudge start tid {} start time {} current time {} duration {} grudge: {} vs {}",
+ ctx.thread, ctx.start, s.timer.now(),
+ _duration,
+ std::vector<raft::server_id>{nodes.begin(), mid},
+ std::vector<raft::server_id>{mid, nodes.end()});
+
+ co_await s.timer.sleep(_duration);
+
+ tlogger.debug("network_majority_grudge end tid {} start time {} current time {}", ctx.thread, ctx.start, s.timer.now());
+
+ // Some servers in `nodes` may already be gone at this point but network doesn't care.
+ // It's safe to call `remove_grudge`.
+ for (auto x = nodes.begin(); x != mid; ++x) {
+ for (auto y = mid; y != nodes.end(); ++y) {
+ s.env.get_network().remove_grudge(*x, *y);
+ s.env.get_network().remove_grudge(*y, *x);
+ }
+ }
+
+ co_return std::monostate{};
+ }
+
+ friend std::ostream& operator<<(std::ostream& os, const network_majority_grudge& p) {
+ return os << format("network_majority_grudge{{duration:{}}}", p._duration);
+ }
+};
+
+std::ostream& operator<<(std::ostream& os, const std::monostate&) {
+ return os << "";
+}
+
+template <typename T, typename... Ts>
+std::ostream& operator<<(std::ostream& os, const std::variant<T, Ts...>& v) {
+ std::visit([&os] (auto& arg) { os << arg; }, v);
+ return os;
+}
+
+namespace operation {
+
+std::ostream& operator<<(std::ostream& os, const thread_id& tid) {
+ return os << format("thread_id{{{}}}", tid.id);
+}
+
+} // namespace operation
+
+SEASTAR_TEST_CASE(basic_generator_test) {
+ using op_type = operation::invocable<operation::either_of<raft_call<ExReg>, network_majority_grudge<ExReg>>>;
+ using history_t = utils::chunked_vector<std::variant<op_type, operation::completion<op_type>>>;
+
+ static_assert(operation::Invocable<op_type>);
+
+ logical_timer timer;
+ environment_config cfg {
+ .network_delay = 3_t,
+ .fd_convict_threshold = 50_t,
+ };
+ auto history = co_await with_env_and_ticker<ExReg>(cfg, [&cfg, &timer] (environment<ExReg>& env, ticker& t) -> future<history_t> {
+ t.start({
+ {1, [&] {
+ env.tick_network();
+ timer.tick();
+ }},
+ {10, [&] {
+ env.tick_servers();
+ }}
+ }, 10'000);
+
+ auto leader_id = co_await env.new_server(true);
+
+ // Wait for the server to elect itself as a leader.
+ assert(co_await wait_for_leader<ExReg>{}(env, {leader_id}, timer, 1000_t) == leader_id);
+
+
+ size_t no_servers = 5;
+ std::unordered_set<raft::server_id> servers{leader_id};
+ for (size_t i = 1; i < no_servers; ++i) {
+ servers.insert(co_await env.new_server(false));
+ }
+
+ assert(std::holds_alternative<std::monostate>(
+ co_await env.get_server(leader_id).reconfigure(
+ std::vector<raft::server_id>{servers.begin(), servers.end()}, timer.now() + 100_t, timer)));
+
+ auto threads = operation::make_thread_set(servers.size() + 1);
+ auto nemesis_thread = some(threads);
+
+ auto seed = tests::random::get_int<int32_t>();
+
+ // TODO: make it dynamic based on the current configuration
+ std::unordered_set<raft::server_id>& known = servers;
+
+ raft_call<ExReg>::state_type db_call_state {
+ .env = env,
+ .known = known,
+ .timer = timer
+ };
+
+ network_majority_grudge<ExReg>::state_type network_majority_grudge_state {
+ .env = env,
+ .known = known,
+ .timer = timer,
+ .rnd = std::mt19937{seed}
+ };
+
+ auto init_state = op_type::state_type{std::move(db_call_state), std::move(network_majority_grudge_state)};
+
+ using namespace generator;
+
+ // For reference to ``real life'' suppose 1_t ~= 10ms. Then:
+ // 10_t (server tick) ~= 100ms
+ // network delay = 3_t ~= 30ms
+ // election timeout = 10 server ticks = 100_t ~= 1s
+ // thus, to enforce leader election, need a majority to convict the current leader for > 100_t ~= 1s,
+ // failure detector convict threshold = 50 srv ticks = 500_t ~= 5s
+ // so need to partition for > 600_t ~= 6s
+ // choose network partition duration uniformly from [600_t-600_t/3, 600_t+600_t/3] = [400_t, 800_t]
+ // ~= [4s, 8s] -> ~1/2 partitions should cause an election
+ // we will set request timeout 600_t ~= 6s and partition every 1200_t ~= 12s
+
+ auto gen = op_limit(500,
+ pin(nemesis_thread,
+ stagger(seed, timer.now() + 200_t, 1200_t, 1200_t,
+ random(seed, [] (std::mt19937& engine) {
+ static std::uniform_int_distribution<raft::logical_clock::rep> dist{400, 800};
+ return op_type{network_majority_grudge<ExReg>{raft::logical_clock::duration{dist(engine)}}};
+ })
+ ),
+ random(seed, [] (std::mt19937& engine) {
+ return op_type{raft_call<ExReg>{ExReg::exchange{engine()}, 200_t}};
+ })
+ )
+ );
+
+ class history_recorder {
+ history_t& _h;
+
+ public:
+ history_recorder(history_t& h) : _h(h) {}
+
+ void operator()(op_type o) {
+ _h.emplace_back(std::move(o));
+ }
+
+ void operator()(operation::completion<op_type> o) {
+ _h.emplace_back(std::move(o));
+ }
+ };
+
+ history_t history;
+ interpreter<op_type, decltype(gen), history_recorder> interp{
+ std::move(gen), std::move(threads), 1_t, std::move(init_state), timer,
+ history_recorder(history)};
+ co_await interp.run();
+
+ co_return history;
+ });
+
+ tlogger.debug("history: {}", history);
+}
--
2.31.1

Kamil Braun

<kbraun@scylladb.com>
unread,
Jul 28, 2021, 6:08:30 AM7/28/21
to scylladb-dev, Tomasz Grabiec, Gleb Natapov, Konstantin Osipov
Rebased the branch onto latest master (the improvements patchset is already merged).
cc @Tomasz Grabiec I think it's ready, could you review please?

Kamil Braun

<kbraun@scylladb.com>
unread,
Aug 3, 2021, 5:17:46 AM8/3/21
to scylladb-dev, Tomasz Grabiec, Gleb Natapov, Konstantin Osipov

Tomasz Grabiec

<tgrabiec@scylladb.com>
unread,
Aug 4, 2021, 8:39:38 PM8/4/21
to Kamil Braun, scylladb-dev, Gleb Natapov, Konstantin Osipov
LGTM.

Needs rebasing.



On Wed, Jul 7, 2021 at 11:43 AM Kamil Braun <kbr...@scylladb.com> wrote:
--
You received this message because you are subscribed to the Google Groups "ScyllaDB development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scylladb-dev...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/scylladb-dev/20210707094332.18581-1-kbraun%40scylladb.com.

Kamil Braun

<kbraun@scylladb.com>
unread,
Aug 16, 2021, 7:14:58 AM8/16/21
to Tomasz Grabiec, scylladb-dev, Gleb Natapov, Konstantin Osipov
@Tomasz Grabiec
I guess there's no point in sending another patchset, I rebased the branch for merging:

Kamil Braun

<kbraun@scylladb.com>
unread,
Aug 16, 2021, 7:15:56 AM8/16/21
to Tomasz Grabiec, scylladb-dev, Gleb Natapov, Konstantin Osipov
Actually, please don't merge yet, I need to check something first.

Kamil Braun

<kbraun@scylladb.com>
unread,
Aug 16, 2021, 7:22:06 AM8/16/21
to Tomasz Grabiec, scylladb-dev, Gleb Natapov, Konstantin Osipov
So it looks that after the apply index sanity check was merged... this basic test fails in debug. Interesting. Need to debug on Thursday...

Kamil Braun

<kbraun@scylladb.com>
unread,
Aug 16, 2021, 7:28:12 AM8/16/21
to Tomasz Grabiec, scylladb-dev, Gleb Natapov, Konstantin Osipov
No, it's not this test. The test which is failing is already in master! (snapshot_uses_correct_term_test)
So it's a separate issue which we need to fix anyway, and I think there's no point in delaying this patchset any further.

Tomasz Grabiec

<tgrabiec@scylladb.com>
unread,
Aug 16, 2021, 9:39:49 AM8/16/21
to Kamil Braun, scylladb-dev, Gleb Natapov, Konstantin Osipov
Is this commit intended to be there:

 TODO (regr test, disable?): raft: avoid sticky leadership rule in some cases
    

On Mon, Aug 16, 2021 at 1:14 PM Kamil Braun <kbr...@scylladb.com> wrote:

Kamil Braun

<kbraun@scylladb.com>
unread,
Aug 16, 2021, 9:42:26 AM8/16/21
to Tomasz Grabiec, scylladb-dev, Gleb Natapov, Konstantin Osipov
No, but... I don't see it on this branch (https://github.com/kbr-/scylla/commits/raft-test-generator-v4), can you send a link to it?

Kamil Braun

<kbraun@scylladb.com>
unread,
Aug 16, 2021, 9:49:12 AM8/16/21
to Tomasz Grabiec, scylladb-dev, Gleb Natapov, Konstantin Osipov
Maybe github performed some weird caching, the branch had this commit earlier but shouldn't any longer.

Tomasz Grabiec

<tgrabiec@scylladb.com>
unread,
Aug 16, 2021, 9:54:47 AM8/16/21
to Kamil Braun, scylladb-dev, Gleb Natapov, Konstantin Osipov
Now it's not there. Please avoid force pushing :)

Kamil Braun

<kbraun@scylladb.com>
unread,
Aug 16, 2021, 9:56:12 AM8/16/21
to Tomasz Grabiec, scylladb-dev, Gleb Natapov, Konstantin Osipov
It looks like github does not guarantee read-your-writes consistency when you use force pushing.
Reply all
Reply to author
Forward
0 new messages