Operations and generators can be composed to create more complex
operations and generators. There are certain composition patterns useful
for many different test scenarios.
This commit introduces a couple of such patterns. For example:
- Given multiple different operation types, we can create a new
operation type - `either_of` - which is a "union" of the original
operation types. Executing `either_of` operation means executing an
operation of one of the original types, but the specific type
can be chosen in runtime.
- Given a generator `g`, `op_limit(n, g)` is a new generator which
limits the number of operations produced by `g`.
- Given a generator `g` and a time duration of `d` ticks, `stagger(g, d)` is a
new generator which spreads the operations from `g` roughly every `d`
ticks. (The actual definition in code is more general and complex but
the idea is similar.)
And so on.
Some of these patterns have correspodning notions in Jepsen, e.g. our
`stagger` has a corresponding `stagger` in Jepsen (although our
`stagger` is more general).
---
test/raft/generator.hh | 408 +++++++++++++++++++++++++++++++++++++++++
1 file changed, 408 insertions(+)
diff --git a/test/raft/generator.hh b/test/raft/generator.hh
index e4004b9e8..2bc50ca09 100644
--- a/test/raft/generator.hh
+++ b/test/raft/generator.hh
@@ -22,6 +22,10 @@
#pragma once
#include <unordered_set>
+#include <random>
+
+#include <boost/mp11/algorithm.hpp>
+#include <boost/implicit_cast.hpp>
#include <seastar/util/variant_utils.hh>
#include "utils/chunked_vector.hh"
@@ -419,6 +423,410 @@ class interpreter {
namespace operation {
+// Given an Executable operation type create an Invocable operation type.
+template <Executable Op>
+struct invocable : public Op {
+ using typename Op::result_type;
+ using typename Op::state_type;
+
+ using Op::execute;
+ using Op::Op;
+
+ std::optional<raft::logical_clock::time_point> ready;
+ std::optional<thread_id> thread;
+};
+
+// Given a non-empty set of Executable operation types, create an Executable operation type representing their union (sum type).
+//
+// The state type of the union is a product of the original state types,
+// i.e. the state of the union contains the states of each operation.
+// The result type is a union of the original result types.
+//
+// An operation of this type is an operation of one of the original types.
+// The state provided to `execute` is a projection of the product state
+// onto the state type corresponding to the original operation type.
+template <Executable... Ops>
+struct either_of {
+ static_assert((std::is_same_v<Ops, Ops> || ...)); // pack is non-empty
+
+ std::variant<Ops...> op;
+
+ using result_type = std::variant<typename Ops::result_type...>;
+ using state_type = std::tuple<typename Ops::state_type...>;
+
+ future<result_type> execute(state_type& s, const context& ctx) {
+ // `co_return co_await` instead of simply `return` to keep the lambda alive during `execute`
+ co_return co_await boost::mp11::mp_with_index<std::tuple_size_v<state_type>>(op.index(),
+ [&s, &ctx, this] (auto I) -> future<result_type> {
+ co_return result_type{co_await std::get<I>(op).execute(std::get<I>(s), ctx)};
+ });
+ }
+
+ template <Executable Op>
+ requires (std::is_same_v<Op, Ops> || ...) // Ops contain Op
+ either_of(Op o) : op(std::move(o)) {
+ static_assert(Executable<either_of<Ops...>>);
+ }
+
+ friend std::ostream& operator<<(std::ostream& os, const either_of& e) {
+ return os << e.op;
+ }
+};
+
+} // namespace operation
+
+// Convert a copy/move constructible type to a copy/move assignable type.
+template <typename T>
+requires std::is_nothrow_move_constructible_v<T>
+class assignable {
+ // Always engaged. Stored in optional for its `emplace` member function.
+ std::optional<T> _v;
+
+public:
+ assignable(T v) : _v(std::move(v)) {}
+ assignable(const assignable&) = default;
+ assignable(assignable&&) = default;
+
+ assignable& operator=(assignable a) {
+ // nothrow_move_constructible guarantees that this does not throw:
+ _v.emplace(std::move(*a._v));
+ return *this;
+ }
+
+ operator T&() { return *_v; }
+ operator const T&() const { return *_v; }
+};
+
+namespace generator {
+
+template <std::invocable<std::mt19937&> F>
+requires std::is_nothrow_move_constructible_v<F>
+struct random_gen {
+ using operation_type = std::invoke_result_t<F, std::mt19937&>;
+
+ op_and_gen<operation_type, random_gen> fetch_op(const context&) const {
+ auto e = _engine;
+ auto r = boost::implicit_cast<F>(_f)(e);
+ return {std::move(r), random_gen{std::move(e), _f}};
+ }
+
+ std::mt19937 _engine;
+ assignable<F> _f;
+};
+
+// Given an operation factory that uses a pseudo-random number generator to produce an operation,
+// creates a generator that produces an infinite random sequence of operations from the factory.
+template <std::invocable<std::mt19937&> F>
+requires std::is_nothrow_move_constructible_v<F>
+random_gen<F> random(int seed, F f) {
+ static_assert(GeneratorOf<random_gen<F>, std::invoke_result_t<F, std::mt19937&>>);
+ return random_gen<F>{std::mt19937{seed}, std::move(f)};
+}
+
+template <std::invocable<> F>
+requires std::is_nothrow_move_constructible_v<F>
+struct constant_gen {
+ using operation_type = std::invoke_result_t<F>;
+
+ op_and_gen<operation_type, constant_gen> fetch_op(const context&) const {
+ return {boost::implicit_cast<F>(_f)(), *this};
+ }
+
+ assignable<F> _f;
+};
+
+// Given an operation factory of no arguments,
+// creates a generator that produces an infinite constant sequence of operations.
+template <std::invocable<> F>
+requires std::is_nothrow_move_constructible_v<F>
+constant_gen<F> constant(F f) {
+ static_assert(GeneratorOf<constant_gen<F>, std::invoke_result_t<F>>);
+ return constant_gen<F>{std::move(f)};
+}
+
+template <std::invocable<int32_t> F>
+requires std::is_nothrow_move_constructible_v<F>
+struct sequence_gen {
+ using operation_type = std::invoke_result_t<F, int32_t>;
+
+ op_and_gen<operation_type, sequence_gen> fetch_op(const context&) const {
+ return {boost::implicit_cast<F>(_f)(_next), sequence_gen{_next + 1, _f}};
+ }
+
+ int32_t _next;
+ assignable<F> _f;
+};
+
+// Given an operation factory using an integer to produce an operation,
+// creates a generator that produces an infinite sequence of operations from the factory
+// by passing consecutive integers starting from `start`.
+template <std::invocable<int32_t> F>
+requires std::is_nothrow_move_constructible_v<F>
+sequence_gen<F> sequence(int32_t start, F f) {
+ static_assert(GeneratorOf<sequence_gen<F>, std::invoke_result_t<F, int32_t>>);
+ return sequence_gen<F>{start, std::move(f)};
+}
+
+template <Generator G>
+struct op_limit_gen {
+ using operation_type = typename G::operation_type;
+
+ op_and_gen<operation_type, op_limit_gen> fetch_op(const context& ctx) const {
+ if (!_remaining) {
+ return {finished{}, *this};
+ }
+
+ auto [op, new_g] = _g.fetch_op(ctx);
+ return {std::move(op), op_limit_gen{_remaining - 1, std::move(new_g)}};
+ }
+
+ size_t _remaining;
+ G _g;
+};
+
+// Given a generator and an operation number `limit`,
+// creates a generator which truncates the sequence of operations returned by the original generator
+// to the first `limit` operations.
+// If the original generator finishes earlier, `op_limit` has no observable effect.
+template <Generator G>
+op_limit_gen<G> op_limit(size_t limit, G g) {
+ static_assert(GeneratorOf<op_limit_gen<G>, typename G::operation_type>);
+ return op_limit_gen<G>{limit, std::move(g)};
+}
+
+template <Generator G, std::predicate<operation::thread_id> P>
+requires operation::HasThread<typename G::operation_type>
+struct on_threads_gen {
+ using operation_type = typename G::operation_type;
+
+ op_and_gen<operation_type, on_threads_gen> fetch_op(const context& ctx) const {
+ operation::thread_set masked_all_threads;
+ masked_all_threads.reserve(ctx.all_threads.size());
+ for (auto& t : ctx.all_threads) {
+ if (_p(t)) {
+ masked_all_threads.insert(t);
+ }
+ }
+
+ if (masked_all_threads.empty()) {
+ return {finished{}, *this};
+ }
+
+ operation::thread_set masked_free_threads;
+ masked_free_threads.reserve(ctx.free_threads.size());
+ for (auto& t : ctx.free_threads) {
+ if (_p(t)) {
+ masked_free_threads.insert(t);
+ }
+ }
+
+ if (masked_free_threads.empty()) {
+ return {pending{}, *this};
+ }
+
+ auto [op, new_g] = _g.fetch_op(context {
+ .now = ctx.now,
+ .all_threads = masked_all_threads,
+ .free_threads = masked_free_threads
+ });
+
+ if (auto i = std::get_if<operation_type>(&op)) {
+ if (i->thread) {
+ assert(masked_free_threads.contains(*i->thread));
+ } else {
+ // The underlying generator didn't assign a thread so we do it.
+ i->thread = some(masked_free_threads);
+ }
+ }
+
+ return {std::move(op), on_threads_gen{_p, std::move(new_g)}};
+ }
+
+ P _p;
+ G _g;
+};
+
+// Given a generator whose operations understand threads and a predicate on threads,
+// creates a generator which produces operations which always specify a thread
+// and the specified threads always satisfy the predicate.
+//
+// Finishes when the original generator finishes or no thread satisfies the predicate.
+// If some thread satisfies the predicate but no thread satisfying the predicate
+// is currently free, returns `pending`.
+//
+// The underlying generator must respect `free_threads`, i.e. it must satisfy the following:
+// if the returned operation specifies a thread, the thread must be one of the `free_threads` passed in the context.
+template <Generator G, std::predicate<operation::thread_id> P>
+requires operation::HasThread<typename G::operation_type>
+on_threads_gen<G, P> on_threads(P p, G g) {
+ static_assert(GeneratorOf<on_threads_gen<G, P>, typename G::operation_type>);
+ return on_threads_gen<G, P>{std::move(p), std::move(g)};
+}
+
+// Compare two generator return values for which one is ``sooner''.
+// Operations are sooner than `pending`, `pending` is sooner than `finished`.
+// For two operations which both specify a ready time, their ready times are compared.
+// If one of them doesn't specify a ready time, we assume it is to be executed ``as soon as possible'',
+// so we assume that its ready time is the current time for comparison purposes (passed in `now`).
+//
+// The return type is `weak_ordering` where smaller represents sooner.
+template <typename Op>
+requires operation::HasReadyTime<Op>
+std::weak_ordering sooner(const op_ret<Op>& r1, const op_ret<Op>& r2, raft::logical_clock::time_point now) {
+ auto op1 = std::get_if<Op>(&r1);
+ auto op2 = std::get_if<Op>(&r2);
+
+ if (op1 && op2) {
+ auto t1 = op1->ready ? *op1->ready : now;
+ auto t2 = op2->ready ? *op2->ready : now;
+
+ return t1 <=> t2;
+ }
+
+ // one or both of them is not an operation (it's pending or finished)
+ // operation is sooner than pending which is sooner than finished
+ // since operation index = 0, pending index = 1, finished index = 2, this works:
+ return r1.index() <=> r2.index();
+}
+
+template <Generator G1, Generator G2>
+requires std::is_same_v<typename G1::operation_type, typename G2::operation_type> && operation::HasReadyTime<typename G1::operation_type>
+struct either_gen {
+ using operation_type = typename G1::operation_type;
+
+ op_and_gen<operation_type, either_gen> fetch_op(const context& ctx) const {
+ auto [op1, new_g1] = _g1.fetch_op(ctx);
+ auto [op2, new_g2] = _g2.fetch_op(ctx);
+
+ auto ord = sooner(op1, op2, ctx.now);
+ bool new_use_g1_on_tie = ord == std::weak_ordering::equivalent ? !_use_g1_on_tie : _use_g1_on_tie;
+
+ if ((ord == std::weak_ordering::equivalent && _use_g1_on_tie) || ord == std::weak_ordering::less) {
+ return {std::move(op1), either_gen{new_use_g1_on_tie, std::move(new_g1), _g2}};
+ }
+
+ return {std::move(op2), either_gen{new_use_g1_on_tie, _g1, std::move(new_g2)}};
+ }
+
+ // Which generator to use when a tie is detected
+ bool _use_g1_on_tie = true;
+
+ G1 _g1;
+ G2 _g2;
+};
+
+// Given two generators producing the same type of operations which understand time,
+// creates a generator whose each produced operation comes from one of the underlying generators.
+//
+// Between operations produced by both generators, chooses the one which is `sooner`.
+// In particular finishes when both generators finish.
+//
+// On the first tie takes an operation from `g1`. On the second and subsequent ties
+// takes an operation from the generator which was not used on the previous tie.
+template <Generator G1, Generator G2>
+requires std::is_same_v<typename G1::operation_type, typename G2::operation_type>
+ && operation::HasReadyTime<typename G1::operation_type>
+either_gen<G1, G2> either(G1 g1, G2 g2) {
+ static_assert(GeneratorOf<either_gen<G1, G2>, typename G1::operation_type>);
+ return either_gen<G1, G2>{._g1{std::move(g1)}, ._g2{std::move(g2)}};
+}
+
+// Given a thread and two generators producing the same type of operations which understand threads and time,
+// creates a generator whose each produced operation comes from one of the underlying generators;
+// furthermore, ensures that all operations from `g1` are assigned to the given thread while all operations
+// from `g2` are assigned to other threads.
+//
+// Ties are resolved as in `either`.
+//
+// Assumes that underlying generators respect `free_threads` (as in `on_threads`).
+template <Generator G1, Generator G2>
+requires std::is_same_v<typename G1::operation_type, typename G2::operation_type>
+ && operation::HasReadyTime<typename G1::operation_type>
+ && operation::HasThread<typename G1::operation_type>
+GeneratorOf<typename G1::operation_type> auto pin(operation::thread_id to, G1 g1, G2 g2) {
+ // Not using lambda because we need the copy constructor
+ struct on_pinned_t {
+ operation::thread_id _to;
+ bool operator()(const operation::thread_id& tid) const { return tid == _to; };
+ } on_pinned {to};
+
+ // Not using std::not_fn for the same reason
+ struct out_of_pinned_t {
+ operation::thread_id _to;
+ bool operator()(const operation::thread_id& tid) const { return tid != _to; };
+ } out_of_pinned {to};
+
+ auto gen = either(
+ on_threads(on_pinned, std::move(g1)),
+ on_threads(out_of_pinned, std::move(g2))
+ );
+
+ static_assert(GeneratorOf<decltype(gen), typename G1::operation_type>);
+ return gen;
+}
+
+template <Generator G>
+requires operation::HasReadyTime<typename G::operation_type>
+struct stagger_gen {
+ using operation_type = typename G::operation_type;
+ using distribution_type = std::uniform_int_distribution<raft::logical_clock::rep>;
+
+ op_and_gen<operation_type, stagger_gen> fetch_op(const context& ctx) const {
+ auto [res, new_g] = _g.fetch_op(ctx);
+
+ if (auto op = std::get_if<operation_type>(&res)) {
+ if (!op->ready || op->ready < _next_ready) {
+ // Need to delay the operation.
+ op->ready = _next_ready;
+ }
+
+ auto e = _engine;
+ distribution_type delta{_delta.param()};
+ auto next_ready = _next_ready + raft::logical_clock::duration{delta(e)};
+ return {std::move(res), stagger_gen{
+ std::move(e), delta,
+ next_ready, std::move(new_g)}
+ };
+ }
+
+ // pending or finished, nothing to do
+ return {std::move(res), *this};
+ }
+
+ std::mt19937 _engine;
+ distribution_type _delta;
+
+ raft::logical_clock::time_point _next_ready;
+ G _g;
+};
+
+// Given a generator whose operations understand time,
+// a logical `start` time point, and an interval of logical time durations [`delta_low`, `delta_high`],
+// produces a sequence of time points where each pair of subsequent time points differs by a time duration
+// chosen in random uniformly from the [`delta_low`, `delta_high`] interval;
+// then, for each operation in the sequence of operations produced by the underlying generator, modifies its `ready`
+// time to be at least as late as the corresponding time point in the above sequence.
+//
+// For example, if we randomly choose the sequence of time points [10, 20, 30], and the `ready` times specified
+// by operations from the underlying generator are [5, `nullopt`, 34], the sequence of operations produced
+// by the new generator will be [10, 20, 34].
+//
+// In particular if `delta = delta_low = delta_high` and the underlying operations never specify `ready`,
+// the produced operations are scheduled to execute every `delta` ticks.
+template <Generator G>
+requires operation::HasReadyTime<typename G::operation_type>
+stagger_gen<G> stagger(
+ int seed, raft::logical_clock::time_point start,
+ raft::logical_clock::duration delta_low, raft::logical_clock::duration delta_high,
+ G g) {
+ static_assert(GeneratorOf<stagger_gen<G>, typename G::operation_type>);
+ return stagger_gen<G>{ ._engine{seed}, ._delta{delta_low.count(), delta_high.count()}, ._next_ready{start}, ._g{std::move(g)} };
+}
+
+} // namespace generator
+
+namespace operation {
+
template <typename Op>
std::ostream& operator<<(std::ostream& os, const exceptional_result<Op>& r) {
try {
--
2.31.1