[QUEUED scylla next] Merge "Parallelize multishard_combining_reader_as_mutation_source test" from Pavel E

0 views
Skip to first unread message

Commit Bot

<bot@cloudius-systems.com>
unread,
Jul 29, 2021, 4:39:54 AM7/29/21
to scylladb-dev@googlegroups.com, Avi Kivity
From: Avi Kivity <a...@scylladb.com>
Committer: Avi Kivity <a...@scylladb.com>
Branch: next

Merge "Parallelize multishard_combining_reader_as_mutation_source test" from Pavel E

"
This is the 3rd slowest test in the set. There are 3 cases out
there that are hard-coded to be sequential. However, splitting
them into boost test cases helps running this test faster in
--parallel-cases mode. Timings for debug mode:

Total before the patch: 25 min
Sequential after the patch: 25 min
Basic case: 5 min
Evict-paused-readers case: 5 min
Single-mutation-buffer case: 15 min

tests: unit.multishard_combining_reader_as_mutation_source(debug)
"

* 'br-parallel-mcr-test' of https://github.com/xemul/scylla:
test: Split test_multishard_combining_reader_as_mutation_source into 3
test: Fix indentation after previous patch
test: Move out internals of test_multishard_combining_reader_as_mutation_source

---
diff --git a/test/boost/multishard_combining_reader_as_mutation_source_test.cc b/test/boost/multishard_combining_reader_as_mutation_source_test.cc
--- a/test/boost/multishard_combining_reader_as_mutation_source_test.cc
+++ b/test/boost/multishard_combining_reader_as_mutation_source_test.cc
@@ -41,92 +41,108 @@
#include "schema_registry.hh"
#include "service/priority_manager.hh"

-// Best run with SMP >= 2
-SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) {
- if (smp::count < 2) {
- std::cerr << "Cannot run test " << get_name() << " with smp::count < 2" << std::endl;
- return;
- }
+// It has to be a container that does not invalidate pointers
+static std::list<dummy_sharder> keep_alive_sharder;

- // It has to be a container that does not invalidate pointers
- std::list<dummy_sharder> keep_alive_sharder;
+static auto make_populate(bool evict_paused_readers, bool single_fragment_buffer) {
+ return [evict_paused_readers, single_fragment_buffer] (schema_ptr s, const std::vector<mutation>& mutations) mutable {
+ // We need to group mutations that have the same token so they land on the same shard.
+ std::map<dht::token, std::vector<frozen_mutation>> mutations_by_token;

- do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
- auto make_populate = [&] (bool evict_paused_readers, bool single_fragment_buffer) {
- return [&, evict_paused_readers, single_fragment_buffer] (schema_ptr s, const std::vector<mutation>& mutations) mutable {
- // We need to group mutations that have the same token so they land on the same shard.
- std::map<dht::token, std::vector<frozen_mutation>> mutations_by_token;
-
- for (const auto& mut : mutations) {
- mutations_by_token[mut.token()].push_back(freeze(mut));
- }
+ for (const auto& mut : mutations) {
+ mutations_by_token[mut.token()].push_back(freeze(mut));
+ }

- dummy_sharder sharder(s->get_sharder(), mutations_by_token);
+ dummy_sharder sharder(s->get_sharder(), mutations_by_token);

- auto merged_mutations = boost::copy_range<std::vector<std::vector<frozen_mutation>>>(mutations_by_token | boost::adaptors::map_values);
+ auto merged_mutations = boost::copy_range<std::vector<std::vector<frozen_mutation>>>(mutations_by_token | boost::adaptors::map_values);

- auto remote_memtables = make_lw_shared<std::vector<foreign_ptr<lw_shared_ptr<memtable>>>>();
- for (unsigned shard = 0; shard < sharder.shard_count(); ++shard) {
- auto remote_mt = smp::submit_to(shard, [shard, gs = global_schema_ptr(s), &merged_mutations, sharder] {
- auto s = gs.get();
- auto mt = make_lw_shared<memtable>(s);
+ auto remote_memtables = make_lw_shared<std::vector<foreign_ptr<lw_shared_ptr<memtable>>>>();
+ for (unsigned shard = 0; shard < sharder.shard_count(); ++shard) {
+ auto remote_mt = smp::submit_to(shard, [shard, gs = global_schema_ptr(s), &merged_mutations, sharder] {
+ auto s = gs.get();
+ auto mt = make_lw_shared<memtable>(s);

- for (unsigned i = shard; i < merged_mutations.size(); i += sharder.shard_count()) {
- for (auto& mut : merged_mutations[i]) {
- mt->apply(mut.unfreeze(s));
- }
- }
-
- return make_foreign(mt);
- }).get0();
- remote_memtables->emplace_back(std::move(remote_mt));
+ for (unsigned i = shard; i < merged_mutations.size(); i += sharder.shard_count()) {
+ for (auto& mut : merged_mutations[i]) {
+ mt->apply(mut.unfreeze(s));
+ }
}
- keep_alive_sharder.push_back(sharder);
-
- return mutation_source([&, remote_memtables, evict_paused_readers, single_fragment_buffer] (schema_ptr s,
- reader_permit permit,
- const dht::partition_range& range,
- const query::partition_slice& slice,
- const io_priority_class& pc,
- tracing::trace_state_ptr trace_state,
- streamed_mutation::forwarding fwd_sm,
- mutation_reader::forwarding fwd_mr) mutable {
- auto factory = [remote_memtables, single_fragment_buffer] (
- schema_ptr s,
- reader_permit permit,
- const dht::partition_range& range,
- const query::partition_slice& slice,
- const io_priority_class& pc,
- tracing::trace_state_ptr trace_state,
- mutation_reader::forwarding fwd_mr) {
- auto reader = remote_memtables->at(this_shard_id())->make_flat_reader(s, std::move(permit), range, slice, pc, std::move(trace_state),
- streamed_mutation::forwarding::no, fwd_mr);
- if (single_fragment_buffer) {
- reader.set_max_buffer_size(1);
- }
- return reader;
- };
-
- auto lifecycle_policy = seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory), evict_paused_readers);
- auto mr = make_multishard_combining_reader_for_tests(keep_alive_sharder.back(), std::move(lifecycle_policy), s,
- std::move(permit), range, slice, pc, trace_state, fwd_mr);
- if (fwd_sm == streamed_mutation::forwarding::yes) {
- return make_forwardable(std::move(mr));
+
+ return make_foreign(mt);
+ }).get0();
+ remote_memtables->emplace_back(std::move(remote_mt));
+ }
+ keep_alive_sharder.push_back(sharder);
+
+ return mutation_source([&, remote_memtables, evict_paused_readers, single_fragment_buffer] (schema_ptr s,
+ reader_permit permit,
+ const dht::partition_range& range,
+ const query::partition_slice& slice,
+ const io_priority_class& pc,
+ tracing::trace_state_ptr trace_state,
+ streamed_mutation::forwarding fwd_sm,
+ mutation_reader::forwarding fwd_mr) mutable {
+ auto factory = [remote_memtables, single_fragment_buffer] (
+ schema_ptr s,
+ reader_permit permit,
+ const dht::partition_range& range,
+ const query::partition_slice& slice,
+ const io_priority_class& pc,
+ tracing::trace_state_ptr trace_state,
+ mutation_reader::forwarding fwd_mr) {
+ auto reader = remote_memtables->at(this_shard_id())->make_flat_reader(s, std::move(permit), range, slice, pc, std::move(trace_state),
+ streamed_mutation::forwarding::no, fwd_mr);
+ if (single_fragment_buffer) {
+ reader.set_max_buffer_size(1);
}
- return mr;
- });
+ return reader;
};
- };

- testlog.info("run_mutation_source_tests(evict_readers=false, single_fragment_buffer=false)");
+ auto lifecycle_policy = seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory), evict_paused_readers);
+ auto mr = make_multishard_combining_reader_for_tests(keep_alive_sharder.back(), std::move(lifecycle_policy), s,
+ std::move(permit), range, slice, pc, trace_state, fwd_mr);
+ if (fwd_sm == streamed_mutation::forwarding::yes) {
+ return make_forwardable(std::move(mr));
+ }
+ return mr;
+ });
+ };
+}
+
+// Best run with SMP >= 2
+SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader) {
+ if (smp::count < 2) {
+ std::cerr << "Cannot run test " << get_name() << " with smp::count < 2" << std::endl;
+ return;
+ }
+
+ do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
run_mutation_source_tests(make_populate(false, false));
+ return make_ready_future<>();
+ }).get();
+}
+
+SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_evict_paused) {
+ if (smp::count < 2) {
+ std::cerr << "Cannot run test " << get_name() << " with smp::count < 2" << std::endl;
+ return;
+ }

- testlog.info("run_mutation_source_tests(evict_readers=true, single_fragment_buffer=false)");
+ do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
run_mutation_source_tests(make_populate(true, false));
+ return make_ready_future<>();
+ }).get();
+}

- testlog.info("run_mutation_source_tests(evict_readers=true, single_fragment_buffer=true)");
- run_mutation_source_tests(make_populate(true, true));
+SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_with_tiny_buffer) {
+ if (smp::count < 2) {
+ std::cerr << "Cannot run test " << get_name() << " with smp::count < 2" << std::endl;
+ return;
+ }

+ do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
+ run_mutation_source_tests(make_populate(true, true));
return make_ready_future<>();
}).get();
}

Commit Bot

<bot@cloudius-systems.com>
unread,
Jul 29, 2021, 1:20:00 PM7/29/21
to scylladb-dev@googlegroups.com, Avi Kivity
From: Avi Kivity <a...@scylladb.com>
Committer: Avi Kivity <a...@scylladb.com>
Branch: master
Reply all
Reply to author
Forward
0 new messages