[QUEUED scylla next] db: Avoid memtable flush latency on schema merge

35 views
Skip to first unread message

Commit Bot

<bot@cloudius-systems.com>
unread,
Jul 7, 2022, 9:04:10 AM7/7/22
to scylladb-dev@googlegroups.com, Tomasz Grabiec
From: Tomasz Grabiec <tgra...@scylladb.com>
Committer: Tomasz Grabiec <tgra...@scylladb.com>
Branch: next

db: Avoid memtable flush latency on schema merge

Currently, applying schema mutations involves flushing all schema
tables so that on restart commit log replay is performed on top of
latest schema (for correctness). The downside is that schema merge is
very sensitive to fdatasync latency. Flushing a single memtable
involves many syncs, and we flush several of them. It was observed to
take as long as 30 seconds on GCE disks under some conditions.

This patch changes the schema merge to rely on a separate commit log
to replay the mutations on restart. This way it doesn't have to wait
for memtables to be flushed. It has to wait for the commitlog to be
synced, but this cost is well amortized.

We put the mutations into a separate commit log so that schema can be
recovered before replaying user mutations. This is necessary because
regular writes have a dependency on schema version, and replaying on
top of latest schema satisfies all dependencies. Without this, we
could get loss of writes if we replay a write which depends on the
latest schema on top of old schema.

Also, if we have a separate commit log for schema we can delay schema
parsing for after the replay and avoid complexity of recognizing
schema transactions in the log and invoking the schema merge logic.

One complication with this change is that replay_position markers are
commitlog-domain specific and cannot cross domains. They are recorded
in various places which survive node restart: sstables are annotated
with the maximum replay position, and they are present inside
truncation records. The former annotation is used by "truncate"
operation to drop sstables. To prevent old replay positions from being
interpreted in the context in the new schema commitlog domain, the
change refuses to boot if there are truncation records, and also
prohibits truncation of schema tables.

The boot sequence needs to know whether the cluster feature associated
with this change was enabled on all nodes. Fetaures are stored in
system.scylla_local. Because we need to read it before initializing
schema tables, the initialization of tables now has to be split into
two phases. The first phase initializes all system tables except
schema tables, and later we initialize schema tables, after reading
stored cluster features.

The commitlog domain is switched only when all nodes are upgraded, and
only after new node is restarted. This is so that we don't have to add
risky code to deal with hot-switching of the commitlog domain. Cold
switching is safer. This means that after upgrade there is a need for
yet another rolling restart round.

Fixes #8272
Fixes #8309
Fixes #1459

---
diff --git a/db/schema_tables.cc b/db/schema_tables.cc
--- a/db/schema_tables.cc
+++ b/db/schema_tables.cc
@@ -907,6 +907,13 @@ future<> update_schema_version_and_announce(sharded<db::system_keyspace>& sys_ks
*/
future<> merge_schema(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, gms::feature_service& feat, std::vector<mutation> mutations)
{
+ if (this_shard_id() != 0) {
+ // mutations must be applied on the owning shard (0).
+ co_await smp::submit_to(0, [&, fmuts = freeze(mutations)] () mutable -> future<> {
+ return merge_schema(sys_ks, proxy, feat, unfreeze(fmuts));
+ });
+ co_return;
+ }
co_await with_merge_lock([&] () mutable -> future<> {
bool flush_schema = proxy.local().get_db().local().get_config().flush_schema_tables_after_modification();
co_await do_merge_schema(proxy, std::move(mutations), flush_schema);
@@ -1068,13 +1075,17 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std:
auto old_functions = co_await read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces);
auto old_aggregates = co_await read_schema_for_keyspaces(proxy, AGGREGATES, keyspaces);

- co_await proxy.local().mutate_locally(std::move(mutations), tracing::trace_state_ptr());
+ if (proxy.local().get_db().local().uses_schema_commitlog()) {
+ co_await proxy.local().get_db().local().apply(freeze(mutations), db::no_timeout);
+ } else {
+ co_await proxy.local().mutate_locally(std::move(mutations), tracing::trace_state_ptr());

- if (do_flush) {
- auto& db = proxy.local().local_db();
- co_await coroutine::parallel_for_each(column_families, [&db] (const utils::UUID& id) -> future<> {
- return db.flush_on_all(id);
- });
+ if (do_flush) {
+ auto& db = proxy.local().local_db();
+ co_await coroutine::parallel_for_each(column_families, [&db] (const utils::UUID& id) -> future<> {
+ return db.flush_on_all(id);
+ });
+ }
}

// with new data applied
diff --git a/db/schema_tables.hh b/db/schema_tables.hh
--- a/db/schema_tables.hh
+++ b/db/schema_tables.hh
@@ -10,6 +10,7 @@
#pragma once

#include "mutation.hh"
+#include "db/commitlog/replay_position.hh"
#include "schema_fwd.hh"
#include "schema_features.hh"
#include "hashing.hh"
@@ -98,6 +99,8 @@ namespace schema_tables {
using schema_result = std::map<sstring, lw_shared_ptr<query::result_set>>;
using schema_result_value_type = std::pair<sstring, lw_shared_ptr<query::result_set>>;

+const std::string COMMITLOG_FILENAME_PREFIX("SchemaLog-");
+
namespace v3 {

static constexpr auto NAME = "system_schema";
diff --git a/main.cc b/main.cc
--- a/main.cc
+++ b/main.cc
@@ -972,7 +972,8 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
// done only by shard 0, so we'll no longer face race conditions as
// described here: https://github.com/scylladb/scylla/issues/1014
supervisor::notify("loading system sstables");
- replica::distributed_loader::init_system_keyspace(db, ss, gossiper, *cfg, db::table_selector::all()).get();
+ auto system_keyspace_sel = db::table_selector::all_in_keyspace(db::system_keyspace::NAME);
+ replica::distributed_loader::init_system_keyspace(db, ss, gossiper, *cfg, *system_keyspace_sel).get();

smp::invoke_on_all([blocked_reactor_notify_ms] {
engine().update_blocked_reactor_notify_ms(blocked_reactor_notify_ms);
@@ -1073,29 +1074,79 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
sst_format_selector.stop().get();
});

+ // Re-enable previously enabled features on node startup.
+ // This should be done before commitlog starts replaying
+ // since some features affect storage.
+ db::system_keyspace::enable_features_on_startup(feature_service).get();
+
+ db.local().before_schema_keyspace_init();
+
+ // Init schema tables only after enable_features_on_startup()
+ // because table construction consults enabled features.
+ // Needs to be before system_keyspace::setup(), which writes to schema tables.
+ supervisor::notify("loading system_schema sstables");
+ auto schema_keyspace_sel = db::table_selector::all_in_keyspace(db::schema_tables::NAME);
+ replica::distributed_loader::init_system_keyspace(db, ss, gossiper, *cfg, *schema_keyspace_sel).get();
+
// schema migration, if needed, is also done on shard 0
db::legacy_schema_migrator::migrate(proxy, db, qp.local()).get();

// making compaction manager api available, after system keyspace has already been established.
api::set_server_compaction_manager(ctx).get();

- supervisor::notify("loading non-system sstables");
- replica::distributed_loader::init_non_system_keyspaces(db, proxy, sys_ks).get();
-
- supervisor::notify("starting view update generator");
- view_update_generator.start(std::ref(db)).get();
-
supervisor::notify("setting up system keyspace");
// FIXME -- should happen in start(), but
// 1. messaging is on the way with its preferred ip cache
// 2. cql_test_env() doesn't do it
// 3. need to check if it depends on any of the above steps
sys_ks.local().setup(messaging).get();

- // Re-enable previously enabled features on node startup.
- // This should be done before commitlog starts replaying
- // since some features affect storage.
- db::system_keyspace::enable_features_on_startup(feature_service).get();
+ supervisor::notify("starting schema commit log");
+
+ // Check there is no truncation record for schema tables.
+ // Needs to happen before replaying the schema commitlog, which interprets
+ // replay position in the truncation record.
+ // Needs to happen before system_keyspace::setup(), which reads truncation records.
+ for (auto&& e : db.local().get_column_families()) {
+ auto table_ptr = e.second;
+ if (table_ptr->schema()->ks_name() == db::schema_tables::NAME) {
+ if (table_ptr->get_truncation_record() != db_clock::time_point::min()) {
+ // replay_position stored in the truncation record may belong to
+ // the old (default) commitlog domain. It's not safe to interpret
+ // that replay position in the schema commitlog domain.
+ // Refuse to boot in this case. We assume no one truncated schema tables.
+ // We will hit this during rolling upgrade, in which case the user will
+ // roll back and let us know.
+ throw std::runtime_error(format("Schema table {}.{} has a truncation record. Booting is not safe.",
+ table_ptr->schema()->ks_name(), table_ptr->schema()->cf_name()));
+ }
+ }
+ }
+
+ auto sch_cl = db.local().schema_commitlog();
+ if (sch_cl != nullptr) {
+ auto paths = sch_cl->get_segments_to_replay();
+ if (!paths.empty()) {
+ supervisor::notify("replaying schema commit log");
+ auto rp = db::commitlog_replayer::create_replayer(db).get0();
+ rp.recover(paths, db::schema_tables::COMMITLOG_FILENAME_PREFIX).get();
+ supervisor::notify("replaying schema commit log - flushing memtables");
+ db.invoke_on_all([] (replica::database& db) {
+ return db.flush_all_memtables();
+ }).get();
+ supervisor::notify("replaying schema commit log - removing old commitlog segments");
+ //FIXME: discarded future
+ (void)sch_cl->delete_segments(std::move(paths));
+ }
+ }
+
+ db::schema_tables::recalculate_schema_version(sys_ks, proxy, feature_service.local()).get();
+
+ supervisor::notify("loading non-system sstables");
+ replica::distributed_loader::init_non_system_keyspaces(db, proxy, sys_ks).get();
+
+ supervisor::notify("starting view update generator");
+ view_update_generator.start(std::ref(db)).get();

supervisor::notify("starting commit log");
auto cl = db.local().commitlog();
diff --git a/replica/database.cc b/replica/database.cc
--- a/replica/database.cc
+++ b/replica/database.cc
@@ -895,14 +895,56 @@ static bool is_system_table(const schema& s) {
|| s.ks_name() == db::system_distributed_keyspace::NAME_EVERYWHERE;
}

+void database::before_schema_keyspace_init() {
+ assert(this_shard_id() == 0);
+
+ if (!_feat.schema_commitlog) {
+ dblog.info("Not using schema commit log.");
+ _listeners.push_back(_feat.schema_commitlog.when_enabled([] {
+ dblog.warn("All nodes can now switch to use the schema commit log. Restart is needed for this to take effect.");
+ }));
+ return;
+ }
+
+ dblog.info("Using schema commit log.");
+ _uses_schema_commitlog = true;
+
+ db::commitlog::config c;
+ c.commit_log_location = _cfg.commitlog_directory();
+ c.fname_prefix = db::schema_tables::COMMITLOG_FILENAME_PREFIX;
+ c.metrics_category_name = "schema-commitlog";
+ c.commitlog_total_space_in_mb = 10 >> 20;
+ c.commitlog_segment_size_in_mb = _cfg.commitlog_segment_size_in_mb();
+ c.commitlog_sync_period_in_ms = _cfg.commitlog_sync_period_in_ms();
+ c.mode = db::commitlog::sync_mode::BATCH;
+ c.extensions = &_cfg.extensions();
+ c.use_o_dsync = _cfg.commitlog_use_o_dsync();
+ c.allow_going_over_size_limit = true; // for lower latency
+
+ _schema_commitlog = std::make_unique<db::commitlog>(db::commitlog::create_commitlog(std::move(c)).get0());
+ _schema_commitlog->add_flush_handler([this] (db::cf_id_type id, db::replay_position pos) {
+ if (!_column_families.contains(id)) {
+ // the CF has been removed.
+ _schema_commitlog->discard_completed_segments(id);
+ return;
+ }
+ // Initiate a background flush. Waited upon in `stop()`.
+ (void)_column_families[id]->flush(pos);
+
+ }).release();
+}
+
void database::add_column_family(keyspace& ks, schema_ptr schema, column_family::config cfg) {
schema = local_schema_registry().learn(schema);
schema->registry_entry()->mark_synced();
// avoid self-reporting
auto& sst_manager = is_system_table(*schema) ? get_system_sstables_manager() : get_user_sstables_manager();
lw_shared_ptr<column_family> cf;
if (cfg.enable_commitlog && _commitlog) {
- cf = make_lw_shared<column_family>(schema, std::move(cfg), *_commitlog, *_compaction_manager, sst_manager, *_cl_stats, _row_cache_tracker);
+ db::commitlog& cl = schema->ks_name() == db::schema_tables::NAME && _uses_schema_commitlog
+ ? *_schema_commitlog
+ : *_commitlog;
+ cf = make_lw_shared<column_family>(schema, std::move(cfg), cl, *_compaction_manager, sst_manager, *_cl_stats, _row_cache_tracker);
} else {
cf = make_lw_shared<column_family>(schema, std::move(cfg), column_family::no_commitlog(), *_compaction_manager, sst_manager, *_cl_stats, _row_cache_tracker);
}
@@ -2178,10 +2220,16 @@ future<> database::stop() {
if (_commitlog) {
co_await _commitlog->shutdown();
}
+ if (_schema_commitlog) {
+ co_await _schema_commitlog->shutdown();
+ }
co_await _view_update_concurrency_sem.wait(max_memory_pending_view_updates());
if (_commitlog) {
co_await _commitlog->release();
}
+ if (_schema_commitlog) {
+ co_await _schema_commitlog->release();
+ }
co_await _system_dirty_memory_manager.shutdown();
co_await _dirty_memory_manager.shutdown();
co_await _memtable_controller.shutdown();
@@ -2265,6 +2313,13 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun
const auto auto_snapshot = with_snapshot && get_config().auto_snapshot();
const auto should_flush = auto_snapshot;

+ // Schema tables changed commitlog domain at some point and this node will refuse to boot with
+ // truncation record present for schema tables to protect against misinterpreting of replay positions.
+ // Also, the replay_position returned by discard_sstables() may refer to old commit log domain.
+ if (cf.schema()->ks_name() == db::schema_tables::NAME) {
+ throw std::runtime_error(format("Truncating of {}.{} is not allowed.", cf.schema()->ks_name(), cf.schema()->cf_name()));
+ }
+
// Force mutations coming in to re-acquire higher rp:s
// This creates a "soft" ordering, in that we will guarantee that
// any sstable written _after_ we issue the flush below will
@@ -2542,6 +2597,9 @@ future<> database::drain() {
co_await flush_system_column_families();
co_await _stop_barrier.arrive_and_wait();
co_await _commitlog->shutdown();
+ if (_schema_commitlog) {
+ co_await _schema_commitlog->shutdown();
+ }
b.cancel();
}

diff --git a/replica/database.hh b/replica/database.hh
--- a/replica/database.hh
+++ b/replica/database.hh
@@ -1329,6 +1329,7 @@ private:
flat_hash_map<std::pair<sstring, sstring>, utils::UUID, utils::tuple_hash, string_pair_eq>;
ks_cf_to_uuid_t _ks_cf_to_uuid;
std::unique_ptr<db::commitlog> _commitlog;
+ std::unique_ptr<db::commitlog> _schema_commitlog;
utils::updateable_value_source<utils::UUID> _version;
uint32_t _schema_change_count = 0;
// compaction_manager object is referenced by all column families of a database.
@@ -1337,6 +1338,7 @@ private:
bool _enable_incremental_backups = false;
bool _shutdown = false;
bool _enable_autocompaction_toggle = false;
+ bool _uses_schema_commitlog = false;
query::querier_cache _querier_cache;

std::unique_ptr<db::large_data_handler> _large_data_handler;
@@ -1352,6 +1354,7 @@ private:

service::migration_notifier& _mnotifier;
gms::feature_service& _feat;
+ std::vector<std::any> _listeners;
const locator::shared_token_metadata& _shared_token_metadata;

sharded<semaphore>& _sst_dir_semaphore;
@@ -1447,6 +1450,9 @@ public:
db::commitlog* commitlog() const {
return _commitlog.get();
}
+ db::commitlog* schema_commitlog() const {
+ return _schema_commitlog.get();
+ }
replica::cf_stats* cf_stats() {
return &_cf_stats;
}
@@ -1468,6 +1474,7 @@ public:
const service::migration_notifier& get_notifier() const { return _mnotifier; }

void add_column_family(keyspace& ks, schema_ptr schema, column_family::config cfg);
+ void before_schema_keyspace_init();
future<> add_column_family_and_make_directory(schema_ptr schema);

/* throws no_such_column_family if missing */
@@ -1684,6 +1691,10 @@ public:
sharded<semaphore>& get_sharded_sst_dir_semaphore() {
return _sst_dir_semaphore;
}
+
+ bool uses_schema_commitlog() const {
+ return _uses_schema_commitlog;
+ }
};

} // namespace replica

Commit Bot

<bot@cloudius-systems.com>
unread,
Jul 7, 2022, 3:12:12 PM7/7/22
to scylladb-dev@googlegroups.com, Tomasz Grabiec
From: Tomasz Grabiec <tgra...@scylladb.com>
Committer: Tomasz Grabiec <tgra...@scylladb.com>
Branch: master
Reply all
Reply to author
Forward
0 new messages