[QUEUED scylla next] replica: distributed_loader: coroutinize populate_keyspace

0 views
Skip to first unread message

Commit Bot

<bot@cloudius-systems.com>
unread,
May 24, 2022, 11:02:18 AM5/24/22
to scylladb-dev@googlegroups.com, Benny Halevy
From: Benny Halevy <bha...@scylladb.com>
Committer: Benny Halevy <bha...@scylladb.com>
Branch: next

replica: distributed_loader: coroutinize populate_keyspace

Signed-off-by: Benny Halevy <bha...@scylladb.com>

---
diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc
--- a/replica/distributed_loader.cc
+++ b/replica/distributed_loader.cc
@@ -530,26 +530,28 @@ future<> distributed_loader::populate_keyspace(distributed<replica::database>& d
auto i = keyspaces.find(ks_name);
if (i == keyspaces.end()) {
dblog.warn("Skipping undefined keyspace: {}", ks_name);
- return make_ready_future<>();
- } else {
+ co_return;
+ }
+
+ // FIXME: reindent
dblog.info("Populating Keyspace {}", ks_name);
auto& ks = i->second;
auto& column_families = db.local().get_column_families();

- return parallel_for_each(ks.metadata()->cf_meta_data() | boost::adaptors::map_values,
- [ks_name, ksdir, &ks, &column_families, &db] (schema_ptr s) {
+ co_await coroutine::parallel_for_each(ks.metadata()->cf_meta_data() | boost::adaptors::map_values, [&] (schema_ptr s) -> future<> {
utils::UUID uuid = s->id();
lw_shared_ptr<replica::column_family> cf = column_families[uuid];
sstring cfname = cf->schema()->cf_name();
auto sstdir = ks.column_family_directory(ksdir, cfname, uuid);
dblog.info("Keyspace {}: Reading CF {} id={} version={}", ks_name, cfname, uuid, s->version());
- return ks.make_directory_for_column_family(cfname, uuid).then([&db, sstdir, uuid, ks_name, cfname] {
- return distributed_loader::populate_column_family(db, sstdir + "/" + sstables::staging_dir, ks_name, cfname, allow_offstrategy_compaction::no);
- }).then([&db, sstdir, ks_name, cfname] {
- return distributed_loader::populate_column_family(db, sstdir + "/" + sstables::quarantine_dir, ks_name, cfname, allow_offstrategy_compaction::no, must_exist::no);
- }).then([&db, sstdir, uuid, ks_name, cfname] {
- return distributed_loader::populate_column_family(db, sstdir, ks_name, cfname, allow_offstrategy_compaction::yes);
- }).handle_exception([ks_name, cfname, sstdir](std::exception_ptr eptr) {
+
+ try {
+ co_await ks.make_directory_for_column_family(cfname, uuid);
+ co_await distributed_loader::populate_column_family(db, sstdir + "/" + sstables::staging_dir, ks_name, cfname, allow_offstrategy_compaction::no);
+ co_await distributed_loader::populate_column_family(db, sstdir + "/" + sstables::quarantine_dir, ks_name, cfname, allow_offstrategy_compaction::no, must_exist::no);
+ co_await distributed_loader::populate_column_family(db, sstdir, ks_name, cfname, allow_offstrategy_compaction::yes);
+ } catch (...) {
+ std::exception_ptr eptr = std::current_exception();
std::string msg =
format("Exception while populating keyspace '{}' with column family '{}' from file '{}': {}",
ks_name, cfname, sstdir, eptr);
@@ -562,9 +564,8 @@ future<> distributed_loader::populate_keyspace(distributed<replica::database>& d
} catch (...) {
throw std::runtime_error(msg.c_str());
}
- });
- });
- }
+ }
+ });
}

future<> distributed_loader::init_system_keyspace(distributed<replica::database>& db, distributed<service::storage_service>& ss, sharded<gms::gossiper>& g, db::config& cfg) {

Commit Bot

<bot@cloudius-systems.com>
unread,
May 25, 2022, 12:07:45 AM5/25/22
to scylladb-dev@googlegroups.com, Benny Halevy
From: Benny Halevy <bha...@scylladb.com>
Committer: Benny Halevy <bha...@scylladb.com>
Branch: master
Reply all
Reply to author
Forward
0 new messages