[PATCH 0/3] Parallelize multishard_combining_reader_as_mutation_source test

0 views
Skip to first unread message

Pavel Emelyanov

unread,
Jul 27, 2021, 2:40:38 AMJul 27
to scylla...@googlegroups.com, Pavel Emelyanov
This is the 3rd slowest test in the set. There are 3 cases out
there that are hard-coded to be sequential. However, splitting
them into boost test cases helps running this test faster in
--parallel-cases mode. Timings for debug mode:

Total before the patch: 25 min
Sequential after the patch: 25 min
Basic case: 5 min
Evict-paused-readers case: 5 min
Single-mutation-buffer case: 15 min

branch: https://github.com/xemul/scylla/tree/br-parallel-mcr-test
tests: unit.multishard_combining_reader_as_mutation_source(debug)

Pavel Emelyanov (3):
test: Move out internals of
test_multishard_combining_reader_as_mutation_source
test: Fix indentation after previous patch
test: Split test_multishard_combining_reader_as_mutation_source into 3

...ombining_reader_as_mutation_source_test.cc | 158 ++++++++++--------
1 file changed, 87 insertions(+), 71 deletions(-)

--
2.20.1

Pavel Emelyanov

unread,
Jul 27, 2021, 2:40:38 AMJul 27
to scylla...@googlegroups.com, Pavel Emelyanov
Preparation. They will be called from 3 independent cases.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
...ombining_reader_as_mutation_source_test.cc | 24 +++++++++----------
1 file changed, 12 insertions(+), 12 deletions(-)

diff --git a/test/boost/multishard_combining_reader_as_mutation_source_test.cc b/test/boost/multishard_combining_reader_as_mutation_source_test.cc
index c546f28300..07d56927e0 100644
--- a/test/boost/multishard_combining_reader_as_mutation_source_test.cc
+++ b/test/boost/multishard_combining_reader_as_mutation_source_test.cc
@@ -41,18 +41,10 @@
#include "schema_registry.hh"
#include "service/priority_manager.hh"

-// Best run with SMP >= 2
-SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) {
- if (smp::count < 2) {
- std::cerr << "Cannot run test " << get_name() << " with smp::count < 2" << std::endl;
- return;
- }
-
- // It has to be a container that does not invalidate pointers
- std::list<dummy_sharder> keep_alive_sharder;
+// It has to be a container that does not invalidate pointers
+static std::list<dummy_sharder> keep_alive_sharder;

- do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
- auto make_populate = [&] (bool evict_paused_readers, bool single_fragment_buffer) {
+static auto make_populate(bool evict_paused_readers, bool single_fragment_buffer) {
return [&, evict_paused_readers, single_fragment_buffer] (schema_ptr s, const std::vector<mutation>& mutations) mutable {
// We need to group mutations that have the same token so they land on the same shard.
std::map<dht::token, std::vector<frozen_mutation>> mutations_by_token;
@@ -116,8 +108,16 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) {
return mr;
});
};
- };
+}

+// Best run with SMP >= 2
+SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) {
+ if (smp::count < 2) {
+ std::cerr << "Cannot run test " << get_name() << " with smp::count < 2" << std::endl;
+ return;
+ }
+
+ do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
testlog.info("run_mutation_source_tests(evict_readers=false, single_fragment_buffer=false)");
run_mutation_source_tests(make_populate(false, false));

--
2.20.1

Pavel Emelyanov

unread,
Jul 27, 2021, 2:40:39 AMJul 27
to scylla...@googlegroups.com, Pavel Emelyanov
Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
...ombining_reader_as_mutation_source_test.cc | 110 +++++++++---------
1 file changed, 55 insertions(+), 55 deletions(-)

diff --git a/test/boost/multishard_combining_reader_as_mutation_source_test.cc b/test/boost/multishard_combining_reader_as_mutation_source_test.cc
index 07d56927e0..f4eee54ed3 100644
--- a/test/boost/multishard_combining_reader_as_mutation_source_test.cc
+++ b/test/boost/multishard_combining_reader_as_mutation_source_test.cc
@@ -45,69 +45,69 @@
static std::list<dummy_sharder> keep_alive_sharder;

static auto make_populate(bool evict_paused_readers, bool single_fragment_buffer) {
- return [&, evict_paused_readers, single_fragment_buffer] (schema_ptr s, const std::vector<mutation>& mutations) mutable {
- // We need to group mutations that have the same token so they land on the same shard.
- std::map<dht::token, std::vector<frozen_mutation>> mutations_by_token;
+ return [evict_paused_readers, single_fragment_buffer] (schema_ptr s, const std::vector<mutation>& mutations) mutable {
+ // We need to group mutations that have the same token so they land on the same shard.
+ std::map<dht::token, std::vector<frozen_mutation>> mutations_by_token;

- for (const auto& mut : mutations) {
- mutations_by_token[mut.token()].push_back(freeze(mut));
- }
-
- dummy_sharder sharder(s->get_sharder(), mutations_by_token);
+ for (const auto& mut : mutations) {
+ mutations_by_token[mut.token()].push_back(freeze(mut));
+ }

- auto merged_mutations = boost::copy_range<std::vector<std::vector<frozen_mutation>>>(mutations_by_token | boost::adaptors::map_values);
+ dummy_sharder sharder(s->get_sharder(), mutations_by_token);

- auto remote_memtables = make_lw_shared<std::vector<foreign_ptr<lw_shared_ptr<memtable>>>>();
- for (unsigned shard = 0; shard < sharder.shard_count(); ++shard) {
- auto remote_mt = smp::submit_to(shard, [shard, gs = global_schema_ptr(s), &merged_mutations, sharder] {
- auto s = gs.get();
- auto mt = make_lw_shared<memtable>(s);
+ auto merged_mutations = boost::copy_range<std::vector<std::vector<frozen_mutation>>>(mutations_by_token | boost::adaptors::map_values);

- for (unsigned i = shard; i < merged_mutations.size(); i += sharder.shard_count()) {
- for (auto& mut : merged_mutations[i]) {
- mt->apply(mut.unfreeze(s));
- }
- }
+ auto remote_memtables = make_lw_shared<std::vector<foreign_ptr<lw_shared_ptr<memtable>>>>();
+ for (unsigned shard = 0; shard < sharder.shard_count(); ++shard) {
+ auto remote_mt = smp::submit_to(shard, [shard, gs = global_schema_ptr(s), &merged_mutations, sharder] {
+ auto s = gs.get();
+ auto mt = make_lw_shared<memtable>(s);

- return make_foreign(mt);
- }).get0();
- remote_memtables->emplace_back(std::move(remote_mt));
+ for (unsigned i = shard; i < merged_mutations.size(); i += sharder.shard_count()) {
+ for (auto& mut : merged_mutations[i]) {
+ mt->apply(mut.unfreeze(s));
+ }
}
- keep_alive_sharder.push_back(sharder);
-
- return mutation_source([&, remote_memtables, evict_paused_readers, single_fragment_buffer] (schema_ptr s,
- reader_permit permit,
- const dht::partition_range& range,
- const query::partition_slice& slice,
- const io_priority_class& pc,
- tracing::trace_state_ptr trace_state,
- streamed_mutation::forwarding fwd_sm,
- mutation_reader::forwarding fwd_mr) mutable {
- auto factory = [remote_memtables, single_fragment_buffer] (
- schema_ptr s,
- reader_permit permit,
- const dht::partition_range& range,
- const query::partition_slice& slice,
- const io_priority_class& pc,
- tracing::trace_state_ptr trace_state,
- mutation_reader::forwarding fwd_mr) {
- auto reader = remote_memtables->at(this_shard_id())->make_flat_reader(s, std::move(permit), range, slice, pc, std::move(trace_state),
- streamed_mutation::forwarding::no, fwd_mr);
- if (single_fragment_buffer) {
- reader.set_max_buffer_size(1);
- }
- return reader;
- };
-
- auto lifecycle_policy = seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory), evict_paused_readers);
- auto mr = make_multishard_combining_reader_for_tests(keep_alive_sharder.back(), std::move(lifecycle_policy), s,
- std::move(permit), range, slice, pc, trace_state, fwd_mr);
- if (fwd_sm == streamed_mutation::forwarding::yes) {
- return make_forwardable(std::move(mr));
+
+ return make_foreign(mt);
+ }).get0();
+ remote_memtables->emplace_back(std::move(remote_mt));
+ }
+ keep_alive_sharder.push_back(sharder);
+
+ return mutation_source([&, remote_memtables, evict_paused_readers, single_fragment_buffer] (schema_ptr s,
+ reader_permit permit,
+ const dht::partition_range& range,
+ const query::partition_slice& slice,
+ const io_priority_class& pc,
+ tracing::trace_state_ptr trace_state,
+ streamed_mutation::forwarding fwd_sm,
+ mutation_reader::forwarding fwd_mr) mutable {
+ auto factory = [remote_memtables, single_fragment_buffer] (
+ schema_ptr s,
+ reader_permit permit,
+ const dht::partition_range& range,
+ const query::partition_slice& slice,
+ const io_priority_class& pc,
+ tracing::trace_state_ptr trace_state,
+ mutation_reader::forwarding fwd_mr) {
+ auto reader = remote_memtables->at(this_shard_id())->make_flat_reader(s, std::move(permit), range, slice, pc, std::move(trace_state),
+ streamed_mutation::forwarding::no, fwd_mr);
+ if (single_fragment_buffer) {
+ reader.set_max_buffer_size(1);
}
- return mr;
- });
+ return reader;
};
+
+ auto lifecycle_policy = seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory), evict_paused_readers);
+ auto mr = make_multishard_combining_reader_for_tests(keep_alive_sharder.back(), std::move(lifecycle_policy), s,
+ std::move(permit), range, slice, pc, trace_state, fwd_mr);
+ if (fwd_sm == streamed_mutation::forwarding::yes) {
+ return make_forwardable(std::move(mr));
+ }
+ return mr;
+ });
+ };
}

// Best run with SMP >= 2
--
2.20.1

Pavel Emelyanov

unread,
Jul 27, 2021, 2:40:40 AMJul 27
to scylla...@googlegroups.com, Pavel Emelyanov
There are 3 independent cases in this test that benefit
from running in parallel.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
...ombining_reader_as_mutation_source_test.cc | 26 +++++++++++++++----
1 file changed, 21 insertions(+), 5 deletions(-)

diff --git a/test/boost/multishard_combining_reader_as_mutation_source_test.cc b/test/boost/multishard_combining_reader_as_mutation_source_test.cc
index f4eee54ed3..b16cb6c9d7 100644
--- a/test/boost/multishard_combining_reader_as_mutation_source_test.cc
+++ b/test/boost/multishard_combining_reader_as_mutation_source_test.cc
@@ -111,22 +111,38 @@ static auto make_populate(bool evict_paused_readers, bool single_fragment_buffer
}

// Best run with SMP >= 2
-SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) {
+SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader) {
if (smp::count < 2) {
std::cerr << "Cannot run test " << get_name() << " with smp::count < 2" << std::endl;
return;
}

do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
- testlog.info("run_mutation_source_tests(evict_readers=false, single_fragment_buffer=false)");
run_mutation_source_tests(make_populate(false, false));
+ return make_ready_future<>();
+ }).get();
+}
+
+SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_evict_paused) {
+ if (smp::count < 2) {
+ std::cerr << "Cannot run test " << get_name() << " with smp::count < 2" << std::endl;
+ return;
+ }

- testlog.info("run_mutation_source_tests(evict_readers=true, single_fragment_buffer=false)");
+ do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
run_mutation_source_tests(make_populate(true, false));
+ return make_ready_future<>();
+ }).get();
+}

- testlog.info("run_mutation_source_tests(evict_readers=true, single_fragment_buffer=true)");
- run_mutation_source_tests(make_populate(true, true));
+SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_with_tiny_buffer) {
+ if (smp::count < 2) {
+ std::cerr << "Cannot run test " << get_name() << " with smp::count < 2" << std::endl;
+ return;
+ }

+ do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
+ run_mutation_source_tests(make_populate(true, true));
return make_ready_future<>();
}).get();
}
--
2.20.1

Avi Kivity

unread,
Jul 27, 2021, 5:06:00 AMJul 27
to Pavel Emelyanov, scylla...@googlegroups.com, Botond Dénes
Botond, please review.

Botond Dénes

unread,
Jul 29, 2021, 4:27:02 AMJul 29
to Pavel Emelyanov, scylla...@googlegroups.com
Unrelated: I think now that we close readers we can get rid of this.
Not in this series.

Botond Dénes

unread,
Jul 29, 2021, 4:27:06 AMJul 29
to Avi Kivity, Pavel Emelyanov, scylla...@googlegroups.com
LGTM

Commit Bot

unread,
Jul 29, 2021, 4:39:50 AMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: next

test: Move out internals of test_multishard_combining_reader_as_mutation_source

Preparation. They will be called from 3 independent cases.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/test/boost/multishard_combining_reader_as_mutation_source_test.cc b/test/boost/multishard_combining_reader_as_mutation_source_test.cc
--- a/test/boost/multishard_combining_reader_as_mutation_source_test.cc
+++ b/test/boost/multishard_combining_reader_as_mutation_source_test.cc
@@ -41,18 +41,10 @@
#include "schema_registry.hh"
#include "service/priority_manager.hh"

-// Best run with SMP >= 2
-SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) {
- if (smp::count < 2) {
- std::cerr << "Cannot run test " << get_name() << " with smp::count < 2" << std::endl;
- return;
- }
-
- // It has to be a container that does not invalidate pointers
- std::list<dummy_sharder> keep_alive_sharder;
+// It has to be a container that does not invalidate pointers
+static std::list<dummy_sharder> keep_alive_sharder;

Commit Bot

unread,
Jul 29, 2021, 4:39:51 AMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: next

test: Fix indentation after previous patch

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/test/boost/multishard_combining_reader_as_mutation_source_test.cc b/test/boost/multishard_combining_reader_as_mutation_source_test.cc
--- a/test/boost/multishard_combining_reader_as_mutation_source_test.cc
+++ b/test/boost/multishard_combining_reader_as_mutation_source_test.cc
@@ -45,69 +45,69 @@
static std::list<dummy_sharder> keep_alive_sharder;

static auto make_populate(bool evict_paused_readers, bool single_fragment_buffer) {

Commit Bot

unread,
Jul 29, 2021, 4:39:53 AMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: next

test: Split test_multishard_combining_reader_as_mutation_source into 3

There are 3 independent cases in this test that benefit
from running in parallel.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/test/boost/multishard_combining_reader_as_mutation_source_test.cc b/test/boost/multishard_combining_reader_as_mutation_source_test.cc
--- a/test/boost/multishard_combining_reader_as_mutation_source_test.cc
+++ b/test/boost/multishard_combining_reader_as_mutation_source_test.cc
@@ -111,22 +111,38 @@ static auto make_populate(bool evict_paused_readers, bool single_fragment_buffer
}

// Best run with SMP >= 2
-SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) {
+SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader) {
if (smp::count < 2) {
std::cerr << "Cannot run test " << get_name() << " with smp::count < 2" << std::endl;
return;
}

do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
- testlog.info("run_mutation_source_tests(evict_readers=false, single_fragment_buffer=false)");
run_mutation_source_tests(make_populate(false, false));
+ return make_ready_future<>();
+ }).get();
+}
+
+SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_evict_paused) {
+ if (smp::count < 2) {
+ std::cerr << "Cannot run test " << get_name() << " with smp::count < 2" << std::endl;
+ return;
+ }

- testlog.info("run_mutation_source_tests(evict_readers=true, single_fragment_buffer=false)");
+ do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
run_mutation_source_tests(make_populate(true, false));
+ return make_ready_future<>();
+ }).get();
+}

- testlog.info("run_mutation_source_tests(evict_readers=true, single_fragment_buffer=true)");
- run_mutation_source_tests(make_populate(true, true));
+SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_with_tiny_buffer) {
+ if (smp::count < 2) {
+ std::cerr << "Cannot run test " << get_name() << " with smp::count < 2" << std::endl;
+ return;
+ }

Commit Bot

unread,
Jul 29, 2021, 1:19:41 PMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master

test: Move out internals of test_multishard_combining_reader_as_mutation_source

Preparation. They will be called from 3 independent cases.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/test/boost/multishard_combining_reader_as_mutation_source_test.cc b/test/boost/multishard_combining_reader_as_mutation_source_test.cc
--- a/test/boost/multishard_combining_reader_as_mutation_source_test.cc
+++ b/test/boost/multishard_combining_reader_as_mutation_source_test.cc
@@ -41,18 +41,10 @@
#include "schema_registry.hh"
#include "service/priority_manager.hh"

-// Best run with SMP >= 2
-SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) {
- if (smp::count < 2) {
- std::cerr << "Cannot run test " << get_name() << " with smp::count < 2" << std::endl;
- return;
- }
-
- // It has to be a container that does not invalidate pointers
- std::list<dummy_sharder> keep_alive_sharder;
+// It has to be a container that does not invalidate pointers
+static std::list<dummy_sharder> keep_alive_sharder;

- do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
- auto make_populate = [&] (bool evict_paused_readers, bool single_fragment_buffer) {
+static auto make_populate(bool evict_paused_readers, bool single_fragment_buffer) {
return [&, evict_paused_readers, single_fragment_buffer] (schema_ptr s, const std::vector<mutation>& mutations) mutable {
// We need to group mutations that have the same token so they land on the same shard.
std::map<dht::token, std::vector<frozen_mutation>> mutations_by_token;
@@ -116,8 +108,16 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) {
return mr;
});
};
- };
+}

+// Best run with SMP >= 2
+SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) {
+ if (smp::count < 2) {
+ std::cerr << "Cannot run test " << get_name() << " with smp::count < 2" << std::endl;
+ return;
+ }
+
+ do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {

Commit Bot

unread,
Jul 29, 2021, 1:19:43 PMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master

test: Fix indentation after previous patch

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/test/boost/multishard_combining_reader_as_mutation_source_test.cc b/test/boost/multishard_combining_reader_as_mutation_source_test.cc
--- a/test/boost/multishard_combining_reader_as_mutation_source_test.cc
+++ b/test/boost/multishard_combining_reader_as_mutation_source_test.cc
@@ -45,69 +45,69 @@
static std::list<dummy_sharder> keep_alive_sharder;

static auto make_populate(bool evict_paused_readers, bool single_fragment_buffer) {

Commit Bot

unread,
Jul 29, 2021, 1:19:44 PMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master

test: Split test_multishard_combining_reader_as_mutation_source into 3

There are 3 independent cases in this test that benefit
from running in parallel.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/test/boost/multishard_combining_reader_as_mutation_source_test.cc b/test/boost/multishard_combining_reader_as_mutation_source_test.cc
--- a/test/boost/multishard_combining_reader_as_mutation_source_test.cc
+++ b/test/boost/multishard_combining_reader_as_mutation_source_test.cc
@@ -111,22 +111,38 @@ static auto make_populate(bool evict_paused_readers, bool single_fragment_buffer
}

// Best run with SMP >= 2
-SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) {
+SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader) {
if (smp::count < 2) {
std::cerr << "Cannot run test " << get_name() << " with smp::count < 2" << std::endl;
return;
}

do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
- testlog.info("run_mutation_source_tests(evict_readers=false, single_fragment_buffer=false)");
run_mutation_source_tests(make_populate(false, false));
+ return make_ready_future<>();
+ }).get();
+}
+
+SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_evict_paused) {
+ if (smp::count < 2) {
+ std::cerr << "Cannot run test " << get_name() << " with smp::count < 2" << std::endl;
+ return;
+ }

- testlog.info("run_mutation_source_tests(evict_readers=true, single_fragment_buffer=false)");
+ do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
run_mutation_source_tests(make_populate(true, false));
+ return make_ready_future<>();
+ }).get();
+}

- testlog.info("run_mutation_source_tests(evict_readers=true, single_fragment_buffer=true)");
- run_mutation_source_tests(make_populate(true, true));
+SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_with_tiny_buffer) {
+ if (smp::count < 2) {
+ std::cerr << "Cannot run test " << get_name() << " with smp::count < 2" << std::endl;
+ return;
+ }

Reply all
Reply to author
Forward
0 new messages