Currently the following can happen:
1) there's ongoing compaction with input sstable A, so sstable set
and backlog tracker both contains A.
2) ongoing compaction replaces input sstable A by B, so sstable set
contains only B now.
3) schema is updated, so a new backlog tracker is built without A
because sstable set now contains only B.
4) ongoing compaction tries to remove A from tracker, but it was
excluded in step 3.
5) tracker can now have a negative value if table is decreasing in
size, which leads to log(<negative number>) == -NaN
This problem happens because backlog tracker updates is decoupled
from sstable set updates. Given that the content of backlog tracker
should follow exactly the content of sstable set, let's move tracker
management to table. Whenever sstable set is updated, backlog tracker
will be updated with the same changes.
Fixes #9157
test: mode(debug).
Signed-off-by: Raphael S. Carvalho <
raph...@scylladb.com>
---
v2: remove monitor of exhausted sstables once sstable set and
backlog tracker have been updated
---
compaction/compaction.cc | 71 +++++++---------------------------------
database.hh | 2 ++
table.cc | 14 ++++++++
3 files changed, 27 insertions(+), 60 deletions(-)
diff --git a/compaction/compaction.cc b/compaction/compaction.cc
index ffb16cfba..93ea1673c 100644
--- a/compaction/compaction.cc
+++ b/compaction/compaction.cc
@@ -333,15 +333,6 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
return _last_position_seen;
}
- void remove_sstable(bool is_tracking) {
- if (is_tracking && _sst) {
- _cf.get_compaction_strategy().get_backlog_tracker().remove_sstable(_sst);
- } else if (_sst) {
- _cf.get_compaction_strategy().get_backlog_tracker().revert_charges(_sst);
- }
- _sst = {};
- }
-
compaction_read_monitor(sstables::shared_sstable sst, column_family &cf)
: _sst(std::move(sst)), _cf(cf) { }
@@ -357,30 +348,26 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
};
virtual sstables::read_monitor& operator()(sstables::shared_sstable sst) override {
- _generated_monitors.emplace_back(std::move(sst), _cf);
- return _generated_monitors.back();
+ _generated_monitors.emplace_back(std::make_unique<compaction_read_monitor>(std::move(sst), _cf));
+ return *_generated_monitors.back();
}
explicit compaction_read_monitor_generator(column_family& cf)
: _cf(cf) {}
- void remove_sstables(bool is_tracking) {
- for (auto& rm : _generated_monitors) {
- rm.remove_sstable(is_tracking);
- }
- }
-
- void remove_sstable(bool is_tracking, sstables::shared_sstable& sst) {
- for (auto& rm : _generated_monitors) {
- if (rm._sst == sst) {
- rm.remove_sstable(is_tracking);
- break;
+ void release_exhausted_sstables(std::vector<sstables::shared_sstable> exhausted_sstables) {
+ // Release monitor of exhausted sstables (which are supposed to be deregistered from tracker by now) to
+ // guarantee incremental compaction works.
+ auto s = boost::copy_range<std::unordered_set<sstables::shared_sstable>>(std::move(exhausted_sstables));
+ for (auto& monitor : _generated_monitors) {
+ if (monitor && s.contains(monitor->_sst)) {
+ monitor.reset();
}
}
}
private:
column_family& _cf;
- std::deque<compaction_read_monitor> _generated_monitors;
+ std::vector<std::unique_ptr<compaction_read_monitor>> _generated_monitors;
};
// Writes a temporary sstable run containing only garbage collected data.
@@ -740,8 +727,6 @@ class compaction {
std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(), pretty_printed_throughput(_info->end_size, duration),
_info->total_partitions, _info->total_keys_written);
- backlog_tracker_adjust_charges();
-
auto info = std::move(_info);
_cf.get_compaction_manager().deregister_compaction(info);
return std::move(*info);
@@ -749,7 +734,6 @@ class compaction {
virtual std::string_view report_start_desc() const = 0;
virtual std::string_view report_finish_desc() const = 0;
- virtual void backlog_tracker_adjust_charges() { };
std::function<api::timestamp_type(const dht::decorated_key&)> max_purgeable_func() {
if (!tombstone_expiration_enabled()) {
@@ -951,7 +935,6 @@ class reshape_compaction : public compaction {
class regular_compaction : public compaction {
// sstable being currently written.
mutable compaction_read_monitor_generator _monitor_generator;
- std::vector<shared_sstable> _unused_sstables = {};
public:
regular_compaction(column_family& cf, compaction_descriptor descriptor)
: compaction(cf, std::move(descriptor))
@@ -979,19 +962,9 @@ class regular_compaction : public compaction {
return "Compacted";
}
- void backlog_tracker_adjust_charges() override {
- _monitor_generator.remove_sstables(_info->tracking);
- auto& tracker = _cf.get_compaction_strategy().get_backlog_tracker();
- for (auto& sst : _unused_sstables) {
- tracker.add_sstable(sst);
- }
- _unused_sstables.clear();
- }
-
virtual compaction_writer create_compaction_writer(const dht::decorated_key& dk) override {
auto sst = _sstable_creator(this_shard_id());
setup_new_sstable(sst);
- _unused_sstables.push_back(sst);
auto monitor = std::make_unique<compaction_write_monitor>(sst, _cf, maximum_timestamp(), _sstable_level);
sstable_writer_config cfg = make_sstable_writer_config(_info->type);
@@ -1020,26 +993,6 @@ class regular_compaction : public compaction {
_monitor_generator(std::move(sstable));
}
private:
- void backlog_tracker_incrementally_adjust_charges(std::vector<shared_sstable> exhausted_sstables) {
- //
- // Notify backlog tracker of an early sstable replacement triggered by incremental compaction approach.
- // Backlog tracker will be told that the exhausted sstables aren't being compacted anymore, and the
- // new sstables, which replaced the exhausted ones, are not partially written sstables and they can
- // be added to tracker like any other regular sstable in the table's set.
- // This way we prevent bogus calculation of backlog due to lack of charge adjustment whenever there's
- // an early sstable replacement.
- //
-
- for (auto& sst : exhausted_sstables) {
- _monitor_generator.remove_sstable(_info->tracking, sst);
- }
- auto& tracker = _cf.get_compaction_strategy().get_backlog_tracker();
- for (auto& sst : _unused_sstables) {
- tracker.add_sstable(sst);
- }
- _unused_sstables.clear();
- }
-
void maybe_replace_exhausted_sstables_by_sst(shared_sstable sst) {
// Skip earlier replacement of exhausted sstables if compaction works with only single-fragment runs,
// meaning incremental compaction is disabled for this compaction.
@@ -1072,7 +1025,7 @@ class regular_compaction : public compaction {
auto exhausted_ssts = std::vector<shared_sstable>(exhausted, _sstables.end());
_replacer(get_compaction_completion_desc(exhausted_ssts, std::move(_new_unused_sstables)));
_sstables.erase(exhausted, _sstables.end());
- backlog_tracker_incrementally_adjust_charges(std::move(exhausted_ssts));
+ _monitor_generator.release_exhausted_sstables(std::move(exhausted_ssts));
}
}
@@ -1553,8 +1506,6 @@ class resharding_compaction final : public compaction {
return "Resharded";
}
- void backlog_tracker_adjust_charges() override { }
-
compaction_writer create_compaction_writer(const dht::decorated_key& dk) override {
auto shard = dht::shard_of(*_schema, dk.token());
auto sst = _sstable_creator(shard);
diff --git a/database.hh b/database.hh
index 31b88e4d5..cbc2aa543 100644
--- a/database.hh
+++ b/database.hh
@@ -536,6 +536,8 @@ class table : public enable_lw_shared_from_this<table> {
void add_maintenance_sstable(sstables::shared_sstable sst);
static void add_sstable_to_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable);
static void remove_sstable_from_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable);
+ // Update compaction backlog tracker with the same changes applied to the underlying sstable set.
+ void backlog_tracker_adjust_charges(const std::vector<sstables::shared_sstable>& old_sstables, const std::vector<sstables::shared_sstable>& new_sstables);
lw_shared_ptr<memtable> new_memtable();
future<stop_iteration> try_flush_memtable_to_sstable(lw_shared_ptr<memtable> memt, sstable_write_permit&& permit);
// Caller must keep m alive.
diff --git a/table.cc b/table.cc
index 3bc1b80b5..014ed221e 100644
--- a/table.cc
+++ b/table.cc
@@ -325,6 +325,16 @@ inline void table::remove_sstable_from_backlog_tracker(compaction_backlog_tracke
tracker.remove_sstable(std::move(sstable));
}
+void table::backlog_tracker_adjust_charges(const std::vector<sstables::shared_sstable>& old_sstables, const std::vector<sstables::shared_sstable>& new_sstables) {
+ auto& tracker = _compaction_strategy.get_backlog_tracker();
+ for (auto& sst : new_sstables) {
+ tracker.add_sstable(sst);
+ }
+ for (auto& sst : old_sstables) {
+ tracker.remove_sstable(sst);
+ }
+}
+
lw_shared_ptr<sstables::sstable_set>
table::do_add_sstable(lw_shared_ptr<sstables::sstable_set> sstables, sstables::shared_sstable sstable,
enable_backlog_tracker backlog_tracker) {
@@ -772,6 +782,9 @@ table::update_sstable_lists_on_off_strategy_completion(const std::vector<sstable
_t._main_sstables = std::move(_new_main_list);
_t._maintenance_sstables = std::move(_new_maintenance_list);
_t.refresh_compound_sstable_set();
+ // Input sstables aren't not removed from tracker because maintenance set isn't tracked, whereas
+ // output sstables are added to tracker because main set is tracked.
+ _t.backlog_tracker_adjust_charges({}, _new_main);
}
static std::unique_ptr<row_cache::external_updater_impl> make(table& t, sstable_list_builder::permit_t permit, const sstables_t& old_maintenance, const sstables_t& new_main) {
return std::make_unique<sstable_lists_updater>(t, std::move(permit), old_maintenance, new_main);
@@ -843,6 +856,7 @@ table::on_compaction_completion(sstables::compaction_completion_desc& desc) {
virtual void execute() override {
_t._main_sstables = std::move(_new_sstables);
_t.refresh_compound_sstable_set();
+ _t.backlog_tracker_adjust_charges(_desc.old_sstables, _desc.new_sstables);
}
static std::unique_ptr<row_cache::external_updater_impl> make(table& t, sstable_list_builder::permit_t permit, sstables::compaction_completion_desc& d) {
return std::make_unique<sstable_list_updater>(t, std::move(permit), d);
--
2.31.1