[QUEUED scylla next] Merge 'Coroutinize distributed loader' from Benny Halevy

Skip to first unread message

Commit Bot

May 24, 2022, 11:02:21 AM5/24/22
to scylladb-dev@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: next

Merge 'Coroutinize distributed loader' from Benny Halevy

Before touching any of this code for https://github.com/scylladb/scylla/issues/9559,
that requires a change when loading sstables from the staging subdirectory,
simplify it using coroutines.

Closes #10609

* github.com:scylladb/scylla:
replica: distributed_loader: reindent populate_keyspace
replica: distributed_loader: coroutinize populate_keyspace
replica: distributed_loader: reindent handle_sstables_pending_delete
replica: distributed_loader: coroutinize handle_sstables_pending_delete
replica: distributed_loader: reindent cleanup_column_family_temp_sst_dirs
replica: distributed_loader: coroutinize cleanup_column_family_temp_sst_dirs
replica: distributed_loader: reindent make_sstables_available
replica: distributed_loader: coroutinize make_sstables_available
sstable_directory: parallel_for_each_restricted: keep func alive across calls
replica: distributed_loader: reindent reshape
replica: distributed_loader: coroutinize reshape
replica: distributed_loader: coroutinize reshard
replica: distributed_loader: reindent run_resharding_jobs
replica: distributed_loader: coroutinize run_resharding_jobs
replica: distributed_loader: reindent distribute_reshard_jobs
replica: distributed_loader: coroutinize distribute_reshard_jobs
replica: distributed_loader: reindent collect_all_shared_sstables
replica: distributed_loader: coroutinize collect_all_shared_sstables
replica: distributed_loader: reindent process_sstable_dir
replica: distributed_loader: coroutinize process_sstable_dir

diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc
--- a/replica/distributed_loader.cc
+++ b/replica/distributed_loader.cc
@@ -6,6 +6,9 @@
* SPDX-License-Identifier: AGPL-3.0-or-later

+#include <seastar/core/coroutine.hh>
+#include <seastar/coroutine/maybe_yield.hh>
+#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/util/closeable.hh>
#include "distributed_loader.hh"
#include "replica/database.hh"
@@ -86,21 +89,18 @@ class global_column_family_ptr {

distributed_loader::process_sstable_dir(sharded<sstables::sstable_directory>& dir, bool sort_sstables_according_to_owner) {
- return dir.invoke_on(0, [] (const sstables::sstable_directory& d) {
+ co_await dir.invoke_on(0, [] (const sstables::sstable_directory& d) {
return utils::directories::verify_owner_and_mode(d.sstable_dir());
- }).then([&dir, sort_sstables_according_to_owner] {
- return dir.invoke_on_all([&dir, sort_sstables_according_to_owner] (sstables::sstable_directory& d) {
+ });
+ co_await dir.invoke_on_all([&dir, sort_sstables_according_to_owner] (sstables::sstable_directory& d) -> future<> {
// Supposed to be called with the node either down or on behalf of maintenance tasks
// like nodetool refresh
- return d.process_sstable_dir(service::get_local_streaming_priority(), sort_sstables_according_to_owner).then([&dir, &d] {
- return d.move_foreign_sstables(dir);
- });
- });
- }).then([&dir] {
- return dir.invoke_on_all([&dir] (sstables::sstable_directory& d) {
- return d.commit_directory_changes();
- });
+ co_await d.process_sstable_dir(service::get_local_streaming_priority(), sort_sstables_according_to_owner);
+ co_await d.move_foreign_sstables(dir);
+ co_await dir.invoke_on_all(&sstables::sstable_directory::commit_directory_changes);

@@ -136,29 +136,27 @@ struct reshard_shard_descriptor {
// manipulate it in a do_for_each loop (which yields) instead of using standard accumulators.
collect_all_shared_sstables(sharded<sstables::sstable_directory>& dir) {
- return do_with(sstables::sstable_directory::sstable_info_vector(), [&dir] (sstables::sstable_directory::sstable_info_vector& info_vec) {
- // We want to make sure that each distributed object reshards about the same amount of data.
- // Each sharded object has its own shared SSTables. We can use a clever algorithm in which they
- // all distributely figure out which SSTables to exchange, but we'll keep it simple and move all
- // their foreign_sstable_open_info to a coordinator (the shard who called this function). We can
- // move in bulk and that's efficient. That shard can then distribute the work among all the
- // others who will reshard.
- auto coordinator = this_shard_id();
- // We will first move all of the foreign open info to temporary storage so that we can sort
- // them. We want to distribute bigger sstables first.
- return dir.invoke_on_all([&info_vec, coordinator] (sstables::sstable_directory& d) {
- return smp::submit_to(coordinator, [&info_vec, info = d.retrieve_shared_sstables()] () mutable {
- // We want do_for_each here instead of a loop to avoid stalls. Resharding can be
- // called during node operations too. For example, if it is called to load new
- // SSTables into the system.
- return do_for_each(info, [&info_vec] (sstables::foreign_sstable_open_info& info) {
- info_vec.push_back(std::move(info));
- });
- });
- }).then([&info_vec] () mutable {
- return make_ready_future<sstables::sstable_directory::sstable_info_vector>(std::move(info_vec));
+ auto info_vec = sstables::sstable_directory::sstable_info_vector();
+ // We want to make sure that each distributed object reshards about the same amount of data.
+ // Each sharded object has its own shared SSTables. We can use a clever algorithm in which they
+ // all distributely figure out which SSTables to exchange, but we'll keep it simple and move all
+ // their foreign_sstable_open_info to a coordinator (the shard who called this function). We can
+ // move in bulk and that's efficient. That shard can then distribute the work among all the
+ // others who will reshard.
+ auto coordinator = this_shard_id();
+ // We will first move all of the foreign open info to temporary storage so that we can sort
+ // them. We want to distribute bigger sstables first.
+ co_await dir.invoke_on_all([&info_vec, coordinator] (sstables::sstable_directory& d) -> future<> {
+ co_await smp::submit_to(coordinator, [&] () -> future<> {
+ for (auto& info : d.retrieve_shared_sstables()) {
+ info_vec.push_back(std::move(info));
+ co_await coroutine::maybe_yield();
+ }
+ co_return info_vec;

// Given a vector of shared sstables to be resharded, distribute it among all shards.
@@ -167,49 +165,46 @@ collect_all_shared_sstables(sharded<sstables::sstable_directory>& dir) {
// Returns a reshard_shard_descriptor per shard indicating the work that each shard has to do.
distribute_reshard_jobs(sstables::sstable_directory::sstable_info_vector source) {
- return do_with(std::move(source), std::vector<reshard_shard_descriptor>(smp::count),
- [] (sstables::sstable_directory::sstable_info_vector& source, std::vector<reshard_shard_descriptor>& destinations) mutable {
- std::sort(source.begin(), source.end(), [] (const sstables::foreign_sstable_open_info& a, const sstables::foreign_sstable_open_info& b) {
- // Sort on descending SSTable sizes.
- return a.uncompressed_data_size > b.uncompressed_data_size;
- });
- return do_for_each(source, [&destinations] (sstables::foreign_sstable_open_info& info) mutable {
- auto shard_it = boost::min_element(destinations, std::mem_fn(&reshard_shard_descriptor::total_size_smaller));
- shard_it->uncompressed_data_size += info.uncompressed_data_size;
- shard_it->info_vec.push_back(std::move(info));
- }).then([&destinations] () mutable {
- return make_ready_future<std::vector<reshard_shard_descriptor>>(std::move(destinations));
- });
+ auto destinations = std::vector<reshard_shard_descriptor>(smp::count);
+ std::sort(source.begin(), source.end(), [] (const sstables::foreign_sstable_open_info& a, const sstables::foreign_sstable_open_info& b) {
+ // Sort on descending SSTable sizes.
+ return a.uncompressed_data_size > b.uncompressed_data_size;
+ for (auto& info : source) {
+ auto shard_it = boost::min_element(destinations, std::mem_fn(&reshard_shard_descriptor::total_size_smaller));
+ shard_it->uncompressed_data_size += info.uncompressed_data_size;
+ shard_it->info_vec.push_back(std::move(info));
+ co_await coroutine::maybe_yield();
+ }
+ co_return destinations;

future<> run_resharding_jobs(sharded<sstables::sstable_directory>& dir, std::vector<reshard_shard_descriptor> reshard_jobs,
sharded<replica::database>& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator) {

uint64_t total_size = boost::accumulate(reshard_jobs | boost::adaptors::transformed(std::mem_fn(&reshard_shard_descriptor::size)), uint64_t(0));
if (total_size == 0) {
- return make_ready_future<>();
+ co_return;

- return do_with(std::move(reshard_jobs), [&dir, &db, ks_name, table_name, creator = std::move(creator), total_size] (std::vector<reshard_shard_descriptor>& reshard_jobs) {
- auto start = std::chrono::steady_clock::now();
- dblog.info("{}", fmt::format("Resharding {} for {}.{}", sstables::pretty_printed_data_size(total_size), ks_name, table_name));
- return dir.invoke_on_all([&dir, &db, &reshard_jobs, ks_name, table_name, creator] (sstables::sstable_directory& d) mutable {
- auto& table = db.local().find_column_family(ks_name, table_name);
- auto info_vec = std::move(reshard_jobs[this_shard_id()].info_vec);
- auto& cm = db.local().get_compaction_manager();
- auto max_threshold = table.schema()->max_compaction_threshold();
- auto& iop = service::get_local_streaming_priority();
- return d.reshard(std::move(info_vec), cm, table, max_threshold, creator, iop).then([&d, &dir] {
- return d.move_foreign_sstables(dir);
- });
- }).then([start, total_size, ks_name, table_name] {
- auto duration = std::chrono::duration_cast<std::chrono::duration<float>>(std::chrono::steady_clock::now() - start);
- dblog.info("{}", fmt::format("Resharded {} for {}.{} in {:.2f} seconds, {}", sstables::pretty_printed_data_size(total_size), ks_name, table_name, duration.count(), sstables::pretty_printed_throughput(total_size, duration)));
- return make_ready_future<>();
- });
+ auto start = std::chrono::steady_clock::now();
+ dblog.info("{}", fmt::format("Resharding {} for {}.{}", sstables::pretty_printed_data_size(total_size), ks_name, table_name));
+ co_await dir.invoke_on_all([&] (sstables::sstable_directory& d) -> future<> {
+ auto& table = db.local().find_column_family(ks_name, table_name);
+ auto info_vec = std::move(reshard_jobs[this_shard_id()].info_vec);
+ auto& cm = db.local().get_compaction_manager();
+ auto max_threshold = table.schema()->max_compaction_threshold();
+ auto& iop = service::get_local_streaming_priority();
+ co_await d.reshard(std::move(info_vec), cm, table, max_threshold, creator, iop);
+ co_await d.move_foreign_sstables(dir);
+ auto duration = std::chrono::duration_cast<std::chrono::duration<float>>(std::chrono::steady_clock::now() - start);
+ dblog.info("{}", fmt::format("Resharded {} for {}.{} in {:.2f} seconds, {}", sstables::pretty_printed_data_size(total_size), ks_name, table_name, duration.count(), sstables::pretty_printed_throughput(total_size, duration)));

// Global resharding function. Done in two parts:
@@ -219,11 +214,9 @@ future<> run_resharding_jobs(sharded<sstables::sstable_directory>& dir, std::vec
// assigned.
distributed_loader::reshard(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator) {
- return collect_all_shared_sstables(dir).then([] (sstables::sstable_directory::sstable_info_vector all_jobs) mutable {
- return distribute_reshard_jobs(std::move(all_jobs));
- }).then([&dir, &db, ks_name, table_name, creator = std::move(creator)] (std::vector<reshard_shard_descriptor> destinations) mutable {
- return run_resharding_jobs(dir, std::move(destinations), db, ks_name, table_name, std::move(creator));
- });
+ auto all_jobs = co_await collect_all_shared_sstables(dir);
+ auto destinations = co_await distribute_reshard_jobs(std::move(all_jobs));
+ co_await run_resharding_jobs(dir, std::move(destinations), db, ks_name, table_name, std::move(creator));

@@ -247,18 +240,17 @@ distributed_loader::reshape(sharded<sstables::sstable_directory>& dir, sharded<r
std::function<bool (const sstables::shared_sstable&)> filter) {

auto start = std::chrono::steady_clock::now();
- return dir.map_reduce0([&dir, &db, ks_name = std::move(ks_name), table_name = std::move(table_name), creator = std::move(creator), mode, filter] (sstables::sstable_directory& d) {
+ auto total_size = co_await dir.map_reduce0([&dir, &db, ks_name = std::move(ks_name), table_name = std::move(table_name), creator = std::move(creator), mode, filter] (sstables::sstable_directory& d) {
auto& table = db.local().find_column_family(ks_name, table_name);
auto& cm = db.local().get_compaction_manager();
auto& iop = service::get_local_streaming_priority();
return d.reshape(cm, table, creator, iop, mode, filter);
- }, uint64_t(0), std::plus<uint64_t>()).then([start] (uint64_t total_size) {
- if (total_size > 0) {
- auto duration = std::chrono::duration_cast<std::chrono::duration<float>>(std::chrono::steady_clock::now() - start);
- dblog.info("{}", fmt::format("Reshaped {} in {:.2f} seconds, {}", sstables::pretty_printed_data_size(total_size), duration.count(), sstables::pretty_printed_throughput(total_size, duration)));
- }
- return make_ready_future<>();
- });
+ }, uint64_t(0), std::plus<uint64_t>());
+ if (total_size > 0) {
+ auto duration = std::chrono::duration_cast<std::chrono::duration<float>>(std::chrono::steady_clock::now() - start);
+ dblog.info("{}", fmt::format("Reshaped {} in {:.2f} seconds, {}", sstables::pretty_printed_data_size(total_size), duration.count(), sstables::pretty_printed_throughput(total_size, duration)));
+ }

// Loads SSTables into the main directory (or staging) and returns how many were loaded
@@ -267,43 +259,34 @@ distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sh
sharded<db::view::view_update_generator>& view_update_generator, fs::path datadir, sstring ks, sstring cf) {

auto& table = db.local().find_column_family(ks, cf);
+ auto new_sstables = std::vector<sstables::shared_sstable>();
+ co_await dir.do_for_each_sstable([&table, datadir = std::move(datadir), &new_sstables] (sstables::shared_sstable sst) -> future<> {
+ auto gen = table.calculate_generation_for_new_table();
+ dblog.trace("Loading {} into {}, new generation {}", sst->get_filename(), datadir.native(), gen);
+ co_await sst->move_to_new_dir(datadir.native(), gen, true);
+ // When loading an imported sst, set level to 0 because it may overlap with existing ssts on higher levels.
+ sst->set_sstable_level(0);
+ new_sstables.push_back(std::move(sst));
+ });

- return do_with(std::vector<sstables::shared_sstable>(),
- [&table, &dir, &view_update_generator, datadir = std::move(datadir)] (std::vector<sstables::shared_sstable>& new_sstables) {
- return dir.do_for_each_sstable([&table, datadir = std::move(datadir), &new_sstables] (sstables::shared_sstable sst) {
- auto gen = table.calculate_generation_for_new_table();
- dblog.trace("Loading {} into {}, new generation {}", sst->get_filename(), datadir.native(), gen);
- return sst->move_to_new_dir(datadir.native(), gen, true).then([&table, &new_sstables, sst] {
- // When loading an imported sst, set level to 0 because it may overlap with existing ssts on higher levels.
- sst->set_sstable_level(0);
- new_sstables.push_back(std::move(sst));
- return make_ready_future<>();
- });
- }).then([&table, &new_sstables] {
- // nothing loaded
- if (new_sstables.empty()) {
- return make_ready_future<>();
- }
+ // nothing loaded
+ if (new_sstables.empty()) {
+ co_return 0;
+ }

- return table.add_sstables_and_update_cache(new_sstables).handle_exception([&table] (std::exception_ptr ep) {
- dblog.error("Failed to load SSTables for {}.{}: {}. Aborting.", table.schema()->ks_name(), table.schema()->cf_name(), ep);
- abort();
- });
- }).then([&view_update_generator, &table, &new_sstables] {
- return parallel_for_each(new_sstables, [&view_update_generator, &table] (sstables::shared_sstable& sst) {
- if (sst->requires_view_building()) {
- return view_update_generator.local().register_staging_sstable(sst, table.shared_from_this());
- }
- return make_ready_future<>();
- });
- }).then_wrapped([&new_sstables] (future<> f) {
- if (!f.failed()) {
- return make_ready_future<size_t>(new_sstables.size());
- } else {
- return make_exception_future<size_t>(f.get_exception());
- }
- });
+ co_await table.add_sstables_and_update_cache(new_sstables).handle_exception([&table] (std::exception_ptr ep) {
+ dblog.error("Failed to load SSTables for {}.{}: {}. Aborting.", table.schema()->ks_name(), table.schema()->cf_name(), ep);
+ abort();
+ co_await coroutine::parallel_for_each(new_sstables, [&view_update_generator, &table] (sstables::shared_sstable sst) -> future<> {
+ if (sst->requires_view_building()) {
+ co_await view_update_generator.local().register_staging_sstable(sst, table.shared_from_this());
+ }
+ });
+ co_return new_sstables.size();

@@ -408,48 +391,48 @@ distributed_loader::get_sstables_from_upload_dir(distributed<replica::database>&

future<> distributed_loader::cleanup_column_family_temp_sst_dirs(sstring sstdir) {
- return do_with(std::vector<future<>>(), [sstdir = std::move(sstdir)] (std::vector<future<>>& futures) {
- return lister::scan_dir(sstdir, { directory_entry_type::directory }, [&futures] (fs::path sstdir, directory_entry de) {
- // push futures that remove files/directories into an array of futures,
- // so that the supplied callback will not block scan_dir() from
- // reading the next entry in the directory.
- fs::path dirpath = sstdir / de.name;
- if (sstables::sstable::is_temp_dir(dirpath)) {
- dblog.info("Found temporary sstable directory: {}, removing", dirpath);
- futures.push_back(io_check([dirpath = std::move(dirpath)] () { return lister::rmdir(dirpath); }));
- }
- return make_ready_future<>();
- }).then([&futures] {
- return when_all_succeed(futures.begin(), futures.end()).discard_result();
- });
+ std::vector<future<>> futures;
+ co_await lister::scan_dir(sstdir, { directory_entry_type::directory }, [&] (fs::path sstdir, directory_entry de) {
+ // push futures that remove files/directories into an array of futures,
+ // so that the supplied callback will not block scan_dir() from
+ // reading the next entry in the directory.
+ fs::path dirpath = sstdir / de.name;
+ if (sstables::sstable::is_temp_dir(dirpath)) {
+ dblog.info("Found temporary sstable directory: {}, removing", dirpath);
+ futures.push_back(io_check([dirpath = std::move(dirpath)] () { return lister::rmdir(dirpath); }));
+ }
+ return make_ready_future<>();
+ co_await when_all_succeed(futures.begin(), futures.end()).discard_result();

future<> distributed_loader::handle_sstables_pending_delete(sstring pending_delete_dir) {
- return do_with(std::vector<future<>>(), [dir = std::move(pending_delete_dir)] (std::vector<future<>>& futures) {
- return lister::scan_dir(dir, { directory_entry_type::regular }, [&futures] (fs::path dir, directory_entry de) {
- // push nested futures that remove files/directories into an array of futures,
- // so that the supplied callback will not block scan_dir() from
- // reading the next entry in the directory.
- fs::path file_path = dir / de.name;
- if (file_path.extension() == ".tmp") {
- dblog.info("Found temporary pending_delete log file: {}, deleting", file_path);
- futures.push_back(remove_file(file_path.string()));
- } else if (file_path.extension() == ".log") {
- dblog.info("Found pending_delete log file: {}, replaying", file_path);
- auto f = sstables::replay_pending_delete_log(file_path.string()).then([file_path = std::move(file_path)] {
- dblog.debug("Replayed {}, removing", file_path);
- return remove_file(file_path.string());
- });
- futures.push_back(std::move(f));
- } else {
- dblog.debug("Found unknown file in pending_delete directory: {}, ignoring", file_path);
- }
- return make_ready_future<>();
- }).then([&futures] {
- return when_all_succeed(futures.begin(), futures.end()).discard_result();
- });
+ std::vector<future<>> futures;
+ co_await lister::scan_dir(pending_delete_dir, { directory_entry_type::regular }, [&futures] (fs::path dir, directory_entry de) {
+ // push nested futures that remove files/directories into an array of futures,
+ // so that the supplied callback will not block scan_dir() from
+ // reading the next entry in the directory.
+ fs::path file_path = dir / de.name;
+ if (file_path.extension() == ".tmp") {
+ dblog.info("Found temporary pending_delete log file: {}, deleting", file_path);
+ futures.push_back(remove_file(file_path.string()));
+ } else if (file_path.extension() == ".log") {
+ dblog.info("Found pending_delete log file: {}, replaying", file_path);
+ auto f = sstables::replay_pending_delete_log(file_path.string()).then([file_path = std::move(file_path)] {
+ dblog.debug("Replayed {}, removing", file_path);
+ return remove_file(file_path.string());
+ });
+ futures.push_back(std::move(f));
+ } else {
+ dblog.debug("Found unknown file in pending_delete directory: {}, ignoring", file_path);
+ }
+ return make_ready_future<>();
+ co_await when_all_succeed(futures.begin(), futures.end()).discard_result();

future<> distributed_loader::populate_column_family(distributed<replica::database>& db, sstring sstdir, sstring ks, sstring cf, allow_offstrategy_compaction do_allow_offstrategy_compaction, must_exist dir_must_exist) {
@@ -547,41 +530,41 @@ future<> distributed_loader::populate_keyspace(distributed<replica::database>& d
auto i = keyspaces.find(ks_name);
if (i == keyspaces.end()) {
dblog.warn("Skipping undefined keyspace: {}", ks_name);
- return make_ready_future<>();
- } else {
- dblog.info("Populating Keyspace {}", ks_name);
- auto& ks = i->second;
- auto& column_families = db.local().get_column_families();
- return parallel_for_each(ks.metadata()->cf_meta_data() | boost::adaptors::map_values,
- [ks_name, ksdir, &ks, &column_families, &db] (schema_ptr s) {
- utils::UUID uuid = s->id();
- lw_shared_ptr<replica::column_family> cf = column_families[uuid];
- sstring cfname = cf->schema()->cf_name();
- auto sstdir = ks.column_family_directory(ksdir, cfname, uuid);
- dblog.info("Keyspace {}: Reading CF {} id={} version={}", ks_name, cfname, uuid, s->version());
- return ks.make_directory_for_column_family(cfname, uuid).then([&db, sstdir, uuid, ks_name, cfname] {
- return distributed_loader::populate_column_family(db, sstdir + "/" + sstables::staging_dir, ks_name, cfname, allow_offstrategy_compaction::no);
- }).then([&db, sstdir, ks_name, cfname] {
- return distributed_loader::populate_column_family(db, sstdir + "/" + sstables::quarantine_dir, ks_name, cfname, allow_offstrategy_compaction::no, must_exist::no);
- }).then([&db, sstdir, uuid, ks_name, cfname] {
- return distributed_loader::populate_column_family(db, sstdir, ks_name, cfname, allow_offstrategy_compaction::yes);
- }).handle_exception([ks_name, cfname, sstdir](std::exception_ptr eptr) {
- std::string msg =
- format("Exception while populating keyspace '{}' with column family '{}' from file '{}': {}",
- ks_name, cfname, sstdir, eptr);
- dblog.error("Exception while populating keyspace '{}' with column family '{}' from file '{}': {}",
- ks_name, cfname, sstdir, eptr);
- try {
- std::rethrow_exception(eptr);
- } catch (sstables::compaction_stopped_exception& e) {
- // swallow compaction stopped exception, to allow clean shutdown.
- } catch (...) {
- throw std::runtime_error(msg.c_str());
- }
- });
- });
+ co_return;
+ dblog.info("Populating Keyspace {}", ks_name);
+ auto& ks = i->second;
+ auto& column_families = db.local().get_column_families();
+ co_await coroutine::parallel_for_each(ks.metadata()->cf_meta_data() | boost::adaptors::map_values, [&] (schema_ptr s) -> future<> {
+ utils::UUID uuid = s->id();
+ lw_shared_ptr<replica::column_family> cf = column_families[uuid];
+ sstring cfname = cf->schema()->cf_name();
+ auto sstdir = ks.column_family_directory(ksdir, cfname, uuid);
+ dblog.info("Keyspace {}: Reading CF {} id={} version={}", ks_name, cfname, uuid, s->version());
+ try {
+ co_await ks.make_directory_for_column_family(cfname, uuid);
+ co_await distributed_loader::populate_column_family(db, sstdir + "/" + sstables::staging_dir, ks_name, cfname, allow_offstrategy_compaction::no);
+ co_await distributed_loader::populate_column_family(db, sstdir + "/" + sstables::quarantine_dir, ks_name, cfname, allow_offstrategy_compaction::no, must_exist::no);
+ co_await distributed_loader::populate_column_family(db, sstdir, ks_name, cfname, allow_offstrategy_compaction::yes);
+ } catch (...) {
+ std::exception_ptr eptr = std::current_exception();
+ std::string msg =
+ format("Exception while populating keyspace '{}' with column family '{}' from file '{}': {}",
+ ks_name, cfname, sstdir, eptr);
+ dblog.error("Exception while populating keyspace '{}' with column family '{}' from file '{}': {}",
+ ks_name, cfname, sstdir, eptr);
+ try {
+ std::rethrow_exception(eptr);
+ } catch (sstables::compaction_stopped_exception& e) {
+ // swallow compaction stopped exception, to allow clean shutdown.
+ } catch (...) {
+ throw std::runtime_error(msg.c_str());
+ }
+ }
+ });

future<> distributed_loader::init_system_keyspace(distributed<replica::database>& db, distributed<service::storage_service>& ss, sharded<gms::gossiper>& g, db::config& cfg) {
diff --git a/sstables/sstable_directory.cc b/sstables/sstable_directory.cc
--- a/sstables/sstable_directory.cc
+++ b/sstables/sstable_directory.cc
@@ -430,9 +430,9 @@ sstable_directory::do_for_each_sstable(std::function<future<>(sstables::shared_s
template <typename Container, typename Func>
sstable_directory::parallel_for_each_restricted(Container&& C, Func&& func) {
- return do_with(std::move(C), [this, func = std::move(func)] (Container& c) mutable {
- return max_concurrent_for_each(c, _load_parallelism, [this, func = std::move(func)] (auto& el) mutable {
- return with_semaphore(_load_semaphore, 1, [this, func, el = std::move(el)] () mutable {
+ return do_with(std::move(C), std::move(func), [this] (Container& c, Func& func) mutable {
+ return max_concurrent_for_each(c, _load_parallelism, [this, &func] (auto& el) mutable {
+ return with_semaphore(_load_semaphore, 1, [this, &func, el = std::move(el)] () mutable {
return func(el);

Commit Bot

May 25, 2022, 12:07:58 AM5/25/22
to scylladb-dev@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master
Reply all
Reply to author
0 new messages