[PATCH v2] compaction: Update backlog tracker correctly when schema is updated

0 views
Skip to first unread message

Raphael S. Carvalho

unread,
Sep 14, 2021, 4:18:12 PMSep 14
to scylla...@googlegroups.com, Raphael S. Carvalho
Currently the following can happen:
1) there's ongoing compaction with input sstable A, so sstable set
and backlog tracker both contains A.
2) ongoing compaction replaces input sstable A by B, so sstable set
contains only B now.
3) schema is updated, so a new backlog tracker is built without A
because sstable set now contains only B.
4) ongoing compaction tries to remove A from tracker, but it was
excluded in step 3.
5) tracker can now have a negative value if table is decreasing in
size, which leads to log(<negative number>) == -NaN

This problem happens because backlog tracker updates is decoupled
from sstable set updates. Given that the content of backlog tracker
should follow exactly the content of sstable set, let's move tracker
management to table. Whenever sstable set is updated, backlog tracker
will be updated with the same changes.

Fixes #9157

test: mode(debug).

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>
---
v2: remove monitor of exhausted sstables once sstable set and
backlog tracker have been updated
---
compaction/compaction.cc | 71 +++++++---------------------------------
database.hh | 2 ++
table.cc | 14 ++++++++
3 files changed, 27 insertions(+), 60 deletions(-)

diff --git a/compaction/compaction.cc b/compaction/compaction.cc
index ffb16cfba..93ea1673c 100644
--- a/compaction/compaction.cc
+++ b/compaction/compaction.cc
@@ -333,15 +333,6 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
return _last_position_seen;
}

- void remove_sstable(bool is_tracking) {
- if (is_tracking && _sst) {
- _cf.get_compaction_strategy().get_backlog_tracker().remove_sstable(_sst);
- } else if (_sst) {
- _cf.get_compaction_strategy().get_backlog_tracker().revert_charges(_sst);
- }
- _sst = {};
- }
-
compaction_read_monitor(sstables::shared_sstable sst, column_family &cf)
: _sst(std::move(sst)), _cf(cf) { }

@@ -357,30 +348,26 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
};

virtual sstables::read_monitor& operator()(sstables::shared_sstable sst) override {
- _generated_monitors.emplace_back(std::move(sst), _cf);
- return _generated_monitors.back();
+ _generated_monitors.emplace_back(std::make_unique<compaction_read_monitor>(std::move(sst), _cf));
+ return *_generated_monitors.back();
}

explicit compaction_read_monitor_generator(column_family& cf)
: _cf(cf) {}

- void remove_sstables(bool is_tracking) {
- for (auto& rm : _generated_monitors) {
- rm.remove_sstable(is_tracking);
- }
- }
-
- void remove_sstable(bool is_tracking, sstables::shared_sstable& sst) {
- for (auto& rm : _generated_monitors) {
- if (rm._sst == sst) {
- rm.remove_sstable(is_tracking);
- break;
+ void release_exhausted_sstables(std::vector<sstables::shared_sstable> exhausted_sstables) {
+ // Release monitor of exhausted sstables (which are supposed to be deregistered from tracker by now) to
+ // guarantee incremental compaction works.
+ auto s = boost::copy_range<std::unordered_set<sstables::shared_sstable>>(std::move(exhausted_sstables));
+ for (auto& monitor : _generated_monitors) {
+ if (monitor && s.contains(monitor->_sst)) {
+ monitor.reset();
}
}
}
private:
column_family& _cf;
- std::deque<compaction_read_monitor> _generated_monitors;
+ std::vector<std::unique_ptr<compaction_read_monitor>> _generated_monitors;
};

// Writes a temporary sstable run containing only garbage collected data.
@@ -740,8 +727,6 @@ class compaction {
std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(), pretty_printed_throughput(_info->end_size, duration),
_info->total_partitions, _info->total_keys_written);

- backlog_tracker_adjust_charges();
-
auto info = std::move(_info);
_cf.get_compaction_manager().deregister_compaction(info);
return std::move(*info);
@@ -749,7 +734,6 @@ class compaction {

virtual std::string_view report_start_desc() const = 0;
virtual std::string_view report_finish_desc() const = 0;
- virtual void backlog_tracker_adjust_charges() { };

std::function<api::timestamp_type(const dht::decorated_key&)> max_purgeable_func() {
if (!tombstone_expiration_enabled()) {
@@ -951,7 +935,6 @@ class reshape_compaction : public compaction {
class regular_compaction : public compaction {
// sstable being currently written.
mutable compaction_read_monitor_generator _monitor_generator;
- std::vector<shared_sstable> _unused_sstables = {};
public:
regular_compaction(column_family& cf, compaction_descriptor descriptor)
: compaction(cf, std::move(descriptor))
@@ -979,19 +962,9 @@ class regular_compaction : public compaction {
return "Compacted";
}

- void backlog_tracker_adjust_charges() override {
- _monitor_generator.remove_sstables(_info->tracking);
- auto& tracker = _cf.get_compaction_strategy().get_backlog_tracker();
- for (auto& sst : _unused_sstables) {
- tracker.add_sstable(sst);
- }
- _unused_sstables.clear();
- }
-
virtual compaction_writer create_compaction_writer(const dht::decorated_key& dk) override {
auto sst = _sstable_creator(this_shard_id());
setup_new_sstable(sst);
- _unused_sstables.push_back(sst);

auto monitor = std::make_unique<compaction_write_monitor>(sst, _cf, maximum_timestamp(), _sstable_level);
sstable_writer_config cfg = make_sstable_writer_config(_info->type);
@@ -1020,26 +993,6 @@ class regular_compaction : public compaction {
_monitor_generator(std::move(sstable));
}
private:
- void backlog_tracker_incrementally_adjust_charges(std::vector<shared_sstable> exhausted_sstables) {
- //
- // Notify backlog tracker of an early sstable replacement triggered by incremental compaction approach.
- // Backlog tracker will be told that the exhausted sstables aren't being compacted anymore, and the
- // new sstables, which replaced the exhausted ones, are not partially written sstables and they can
- // be added to tracker like any other regular sstable in the table's set.
- // This way we prevent bogus calculation of backlog due to lack of charge adjustment whenever there's
- // an early sstable replacement.
- //
-
- for (auto& sst : exhausted_sstables) {
- _monitor_generator.remove_sstable(_info->tracking, sst);
- }
- auto& tracker = _cf.get_compaction_strategy().get_backlog_tracker();
- for (auto& sst : _unused_sstables) {
- tracker.add_sstable(sst);
- }
- _unused_sstables.clear();
- }
-
void maybe_replace_exhausted_sstables_by_sst(shared_sstable sst) {
// Skip earlier replacement of exhausted sstables if compaction works with only single-fragment runs,
// meaning incremental compaction is disabled for this compaction.
@@ -1072,7 +1025,7 @@ class regular_compaction : public compaction {
auto exhausted_ssts = std::vector<shared_sstable>(exhausted, _sstables.end());
_replacer(get_compaction_completion_desc(exhausted_ssts, std::move(_new_unused_sstables)));
_sstables.erase(exhausted, _sstables.end());
- backlog_tracker_incrementally_adjust_charges(std::move(exhausted_ssts));
+ _monitor_generator.release_exhausted_sstables(std::move(exhausted_ssts));
}
}

@@ -1553,8 +1506,6 @@ class resharding_compaction final : public compaction {
return "Resharded";
}

- void backlog_tracker_adjust_charges() override { }
-
compaction_writer create_compaction_writer(const dht::decorated_key& dk) override {
auto shard = dht::shard_of(*_schema, dk.token());
auto sst = _sstable_creator(shard);
diff --git a/database.hh b/database.hh
index 31b88e4d5..cbc2aa543 100644
--- a/database.hh
+++ b/database.hh
@@ -536,6 +536,8 @@ class table : public enable_lw_shared_from_this<table> {
void add_maintenance_sstable(sstables::shared_sstable sst);
static void add_sstable_to_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable);
static void remove_sstable_from_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable);
+ // Update compaction backlog tracker with the same changes applied to the underlying sstable set.
+ void backlog_tracker_adjust_charges(const std::vector<sstables::shared_sstable>& old_sstables, const std::vector<sstables::shared_sstable>& new_sstables);
lw_shared_ptr<memtable> new_memtable();
future<stop_iteration> try_flush_memtable_to_sstable(lw_shared_ptr<memtable> memt, sstable_write_permit&& permit);
// Caller must keep m alive.
diff --git a/table.cc b/table.cc
index 3bc1b80b5..014ed221e 100644
--- a/table.cc
+++ b/table.cc
@@ -325,6 +325,16 @@ inline void table::remove_sstable_from_backlog_tracker(compaction_backlog_tracke
tracker.remove_sstable(std::move(sstable));
}

+void table::backlog_tracker_adjust_charges(const std::vector<sstables::shared_sstable>& old_sstables, const std::vector<sstables::shared_sstable>& new_sstables) {
+ auto& tracker = _compaction_strategy.get_backlog_tracker();
+ for (auto& sst : new_sstables) {
+ tracker.add_sstable(sst);
+ }
+ for (auto& sst : old_sstables) {
+ tracker.remove_sstable(sst);
+ }
+}
+
lw_shared_ptr<sstables::sstable_set>
table::do_add_sstable(lw_shared_ptr<sstables::sstable_set> sstables, sstables::shared_sstable sstable,
enable_backlog_tracker backlog_tracker) {
@@ -772,6 +782,9 @@ table::update_sstable_lists_on_off_strategy_completion(const std::vector<sstable
_t._main_sstables = std::move(_new_main_list);
_t._maintenance_sstables = std::move(_new_maintenance_list);
_t.refresh_compound_sstable_set();
+ // Input sstables aren't not removed from tracker because maintenance set isn't tracked, whereas
+ // output sstables are added to tracker because main set is tracked.
+ _t.backlog_tracker_adjust_charges({}, _new_main);
}
static std::unique_ptr<row_cache::external_updater_impl> make(table& t, sstable_list_builder::permit_t permit, const sstables_t& old_maintenance, const sstables_t& new_main) {
return std::make_unique<sstable_lists_updater>(t, std::move(permit), old_maintenance, new_main);
@@ -843,6 +856,7 @@ table::on_compaction_completion(sstables::compaction_completion_desc& desc) {
virtual void execute() override {
_t._main_sstables = std::move(_new_sstables);
_t.refresh_compound_sstable_set();
+ _t.backlog_tracker_adjust_charges(_desc.old_sstables, _desc.new_sstables);
}
static std::unique_ptr<row_cache::external_updater_impl> make(table& t, sstable_list_builder::permit_t permit, sstables::compaction_completion_desc& d) {
return std::make_unique<sstable_list_updater>(t, std::move(permit), d);
--
2.31.1

Benny Halevy

unread,
Sep 20, 2021, 6:23:35 AMSep 20
to Raphael S. Carvalho, scylla...@googlegroups.com
Raphael, in general, the change is hard to review.
Can you separate it into a small series of incremental patches?

See also comments inline below.
here do we delete the entry?
It looks like we just keep adding monitors to _generated_monitors.

Can _generate_monitors become a std::unordered_map<shared_sstable, std::vector<compaction_read_monitor>>?
This way we can simply delete the whole vector here in O(1).

Raphael S. Carvalho

unread,
Sep 20, 2021, 9:59:50 AMSep 20
to Benny Halevy, scylladb-dev
On Mon, Sep 20, 2021 at 7:23 AM Benny Halevy <bha...@scylladb.com> wrote:
>
> Raphael, in general, the change is hard to review.
> Can you separate it into a small series of incremental patches?

I can probably move one or other preparatory change into another
patch, but most of these changes have to go into a single patch for
correctness and bisectability.
there will be as many monitors as there are input sstables, so it's
very limited, but I can perhaps actually remove the monitor from
vector instead of resetting its ptr.

>
> Can _generate_monitors become a std::unordered_map<shared_sstable, std::vector<compaction_read_monitor>>?
> This way we can simply delete the whole vector here in O(1).

there's 1:1 mapping between input sstable and monitor. during a
compaction this procedure will be called only a (fraction on # of
input sstables) times, I think vector is good enough, but we can also
go with unordered_map<sstable, monitor> which simplifies deletion. I
will proceed with the latter then.

Raphael S. Carvalho

unread,
Sep 20, 2021, 4:16:57 PMSep 20
to scylla...@googlegroups.com, Raphael S. Carvalho
Backlog tracker isn't updated correctly when facing a schema change, and
may leak a SSTable if compaction strategy is changed, which causes
backlog to be computed incorrectly. Most of these problems happen because
sstable set and tracker are updated independently, so it could happen
that tracker lose track (pun intended) of changes applied to set.

The first patch will fix the leak when strategy is changed, and the third
patch will make sure that tracker is updated atomically with sstable set,
so these kind of problems will not happen anymore.

Fixes #9157

Also at: https://github.com/raphaelsc/scylla.git fixes_to_backlog_tracker_v3

test: mode(debug)

v3:
- rebased
- push preparatory changes to standalone patches to make review easier
- use unordered_map for generated monitors to make removal easier
- revive patch which removes tracking machinery to fix leaks


Raphael S. Carvalho (4):
compaction: simplify removal of monitors
compaction: introduce
compaction_read_monitor_generator::remove_exhausted_sstables()
compaction: Don't leak backlog of input sstable when compaction
strategy is changed
compaction: Update backlog tracker correctly when schema is updated

compaction/compaction.hh | 5 --
compaction/compaction_manager.hh | 4 --
database.hh | 2 +
compaction/compaction.cc | 69 ++++-----------------------
compaction/compaction_manager.cc | 8 ----
table.cc | 17 +++++--
test/boost/sstable_compaction_test.cc | 15 ++----
7 files changed, 27 insertions(+), 93 deletions(-)

--
2.31.1

Raphael S. Carvalho

unread,
Sep 20, 2021, 4:16:58 PMSep 20
to scylla...@googlegroups.com, Raphael S. Carvalho
by switching to unordered_map, removal of generated monitors is
made easier. this is a preparatory change for patch which will
remove monitor for all exhausted sstables

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>
---
compaction/compaction.cc | 17 ++++++++---------
1 file changed, 8 insertions(+), 9 deletions(-)

diff --git a/compaction/compaction.cc b/compaction/compaction.cc
index e3efbbe08..22d093e94 100644
--- a/compaction/compaction.cc
+++ b/compaction/compaction.cc
@@ -368,30 +368,29 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
};

virtual sstables::read_monitor& operator()(sstables::shared_sstable sst) override {
- _generated_monitors.emplace_back(std::move(sst), _cf);
- return _generated_monitors.back();
+ auto p = _generated_monitors.emplace(sst, compaction_read_monitor(sst, _cf));
+ return p.first->second;
}

explicit compaction_read_monitor_generator(column_family& cf)
: _cf(cf) {}

void remove_sstables(bool is_tracking) {
- for (auto& rm : _generated_monitors) {
+ for (auto& rm : _generated_monitors | boost::adaptors::map_values) {
rm.remove_sstable(is_tracking);
}
}

void remove_sstable(bool is_tracking, sstables::shared_sstable& sst) {
- for (auto& rm : _generated_monitors) {
- if (rm._sst == sst) {
- rm.remove_sstable(is_tracking);
- break;
- }
+ auto it = _generated_monitors.find(sst);
+ if (it != _generated_monitors.end()) {
+ it->second.remove_sstable(is_tracking);
+ _generated_monitors.erase(it);
}
}
private:
column_family& _cf;
- std::deque<compaction_read_monitor> _generated_monitors;
+ std::unordered_map<sstables::shared_sstable, compaction_read_monitor> _generated_monitors;
};

// Writes a temporary sstable run containing only garbage collected data.
--
2.31.1

Raphael S. Carvalho

unread,
Sep 20, 2021, 4:16:59 PMSep 20
to scylla...@googlegroups.com, Raphael S. Carvalho
This new function makes it easier to remove monitor of exhausted
sstables.

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>
---
compaction/compaction.cc | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)

diff --git a/compaction/compaction.cc b/compaction/compaction.cc
index 22d093e94..5801f4806 100644
--- a/compaction/compaction.cc
+++ b/compaction/compaction.cc
@@ -381,11 +381,13 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
}
}

- void remove_sstable(bool is_tracking, sstables::shared_sstable& sst) {
- auto it = _generated_monitors.find(sst);
- if (it != _generated_monitors.end()) {
- it->second.remove_sstable(is_tracking);
- _generated_monitors.erase(it);
+ void remove_exhausted_sstables(bool is_tracking, const std::vector<sstables::shared_sstable>& exhausted_sstables) {
+ for (auto &sst : exhausted_sstables) {
+ auto it = _generated_monitors.find(sst);
+ if (it != _generated_monitors.end()) {
+ it->second.remove_sstable(is_tracking);
+ _generated_monitors.erase(it);
+ }
}
}
private:
@@ -1040,9 +1042,7 @@ class regular_compaction : public compaction {
// an early sstable replacement.
//

- for (auto& sst : exhausted_sstables) {
- _monitor_generator.remove_sstable(_info->tracking, sst);
- }
+ _monitor_generator.remove_exhausted_sstables(_info->tracking, exhausted_sstables);
auto& tracker = _cf.get_compaction_strategy().get_backlog_tracker();
for (auto& sst : _unused_sstables) {
tracker.add_sstable(sst);
--
2.31.1

Raphael S. Carvalho

unread,
Sep 20, 2021, 4:17:02 PMSep 20
to scylla...@googlegroups.com, Raphael S. Carvalho
The generic back formula is: ALL + PARTIAL - COMPACTING

With transfer_ongoing_charges() we already ignore the effect of
ongoing compactions on COMPACTING as we judge them to be pointless.

But ongoing compactions will run to completion, meaning that output
sstables will be added to ALL anyway, in the formula above.

With stop_tracking_ongoing_compactions(), input sstables are never
removed from the tracker, but output sstables are added, which means
we end up with duplicate backlog in the tracker.

By removing this tracking mechanism, pointless ongoing compaction
will be ignored as expected and the leaks will be fixed.

Later, the intention is to force a stop on ongoing compactions if
strategy has changed as they're pointless anyway.

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>
---
compaction/compaction.hh | 5 -----
compaction/compaction_manager.hh | 4 ----
compaction/compaction.cc | 18 ++++++++----------
compaction/compaction_manager.cc | 8 --------
table.cc | 4 ----
test/boost/sstable_compaction_test.cc | 15 +++------------
6 files changed, 11 insertions(+), 43 deletions(-)

diff --git a/compaction/compaction.hh b/compaction/compaction.hh
index d6fabcd8d..5241cd674 100644
--- a/compaction/compaction.hh
+++ b/compaction/compaction.hh
@@ -66,7 +66,6 @@ namespace sstables {
int64_t ended_at;
std::vector<shared_sstable> new_sstables;
sstring stop_requested;
- bool tracking = true;
utils::UUID run_identifier;
utils::UUID compaction_uuid;
struct replacement {
@@ -82,10 +81,6 @@ namespace sstables {
void stop(sstring reason) {
stop_requested = std::move(reason);
}
-
- void stop_tracking() {
- tracking = false;
- }
};

// Compact a list of N sstables into M sstables.
diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh
index aab63a774..ed58ecb95 100644
--- a/compaction/compaction_manager.hh
+++ b/compaction/compaction_manager.hh
@@ -241,10 +241,6 @@ class compaction_manager {
// Cancel requests on cf and wait for a possible ongoing compaction on cf.
future<> remove(column_family* cf);

- // No longer interested in tracking backlog for compactions in this column
- // family. For instance, we could be ALTERing TABLE to a different strategy.
- void stop_tracking_ongoing_compactions(column_family* cf);
-
const stats& get_stats() const {
return _stats;
}
diff --git a/compaction/compaction.cc b/compaction/compaction.cc
index 5801f4806..851f001e0 100644
--- a/compaction/compaction.cc
+++ b/compaction/compaction.cc
@@ -344,11 +344,9 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
return _last_position_seen;
}

- void remove_sstable(bool is_tracking) {
- if (is_tracking && _sst) {
+ void remove_sstable() {
+ if (_sst) {
_cf.get_compaction_strategy().get_backlog_tracker().remove_sstable(_sst);
- } else if (_sst) {
- _cf.get_compaction_strategy().get_backlog_tracker().revert_charges(_sst);
}
_sst = {};
}
@@ -375,17 +373,17 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
explicit compaction_read_monitor_generator(column_family& cf)
: _cf(cf) {}

- void remove_sstables(bool is_tracking) {
+ void remove_sstables() {
for (auto& rm : _generated_monitors | boost::adaptors::map_values) {
- rm.remove_sstable(is_tracking);
+ rm.remove_sstable();
}
}

- void remove_exhausted_sstables(bool is_tracking, const std::vector<sstables::shared_sstable>& exhausted_sstables) {
+ void remove_exhausted_sstables(const std::vector<sstables::shared_sstable>& exhausted_sstables) {
for (auto &sst : exhausted_sstables) {
auto it = _generated_monitors.find(sst);
if (it != _generated_monitors.end()) {
- it->second.remove_sstable(is_tracking);
+ it->second.remove_sstable();
_generated_monitors.erase(it);
}
}
@@ -992,7 +990,7 @@ class regular_compaction : public compaction {
}

void backlog_tracker_adjust_charges() override {
- _monitor_generator.remove_sstables(_info->tracking);
+ _monitor_generator.remove_sstables();
auto& tracker = _cf.get_compaction_strategy().get_backlog_tracker();
for (auto& sst : _unused_sstables) {
tracker.add_sstable(sst);
@@ -1042,7 +1040,7 @@ class regular_compaction : public compaction {
// an early sstable replacement.
//

- _monitor_generator.remove_exhausted_sstables(_info->tracking, exhausted_sstables);
+ _monitor_generator.remove_exhausted_sstables(exhausted_sstables);
auto& tracker = _cf.get_compaction_strategy().get_backlog_tracker();
for (auto& sst : _unused_sstables) {
tracker.add_sstable(sst);
diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc
index f50e339b6..9e7d1e369 100644
--- a/compaction/compaction_manager.cc
+++ b/compaction/compaction_manager.cc
@@ -940,14 +940,6 @@ future<> compaction_manager::remove(column_family* cf) {
});
}

-void compaction_manager::stop_tracking_ongoing_compactions(column_family* cf) {
- for (auto& info : _compactions) {
- if (info->cf == cf) {
- info->stop_tracking();
- }
- }
-}
-
void compaction_manager::stop_compaction(sstring type) {
sstables::compaction_type target_type;
try {
diff --git a/table.cc b/table.cc
index 6fee87860..1df7aa272 100644
--- a/table.cc
+++ b/table.cc
@@ -1036,10 +1036,6 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy)
new_sstables.insert(s);
});

- if (!move_read_charges) {
- _compaction_manager.stop_tracking_ongoing_compactions(this);
- }
-
// now exception safe:
_compaction_strategy = std::move(new_cs);
_main_sstables = std::move(new_sstables);
diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc
index 62e38a2a9..3c0526343 100644
--- a/test/boost/sstable_compaction_test.cc
+++ b/test/boost/sstable_compaction_test.cc
@@ -2955,11 +2955,11 @@ SEASTAR_TEST_CASE(compaction_strategy_aware_major_compaction_test) {
});
}

-SEASTAR_TEST_CASE(backlog_tracker_correctness_after_stop_tracking_compaction) {
+SEASTAR_TEST_CASE(backlog_tracker_correctness_after_changing_compaction_strategy) {
return test_env::do_with_async([] (test_env& env) {
cell_locker_stats cl_stats;

- auto builder = schema_builder("tests", "backlog_correctness_after_stop_tracking_compaction")
+ auto builder = schema_builder("tests", "backlog_tracker_correctness_after_changing_compaction_strategy")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("value", int32_type);
auto s = builder.build();
@@ -3001,15 +3001,7 @@ SEASTAR_TEST_CASE(backlog_tracker_correctness_after_stop_tracking_compaction) {

auto fut = compact_sstables(sstables::compaction_descriptor(ssts, cf->get_sstable_set(), default_priority_class()), *cf, sst_gen);

- bool stopped_tracking = false;
- for (auto& info : cf._data->cm.get_compactions()) {
- if (info->cf == &*cf) {
- info->stop_tracking();
- stopped_tracking = true;
- }
- }
- BOOST_REQUIRE(stopped_tracking);
-
+ // set_compaction_strategy() itself is responsible for transferring charges from old to new backlog tracker.
cf->set_compaction_strategy(sstables::compaction_strategy_type::time_window);
for (auto& sst : ssts) {
cf->get_compaction_strategy().get_backlog_tracker().add_sstable(sst);
@@ -3017,7 +3009,6 @@ SEASTAR_TEST_CASE(backlog_tracker_correctness_after_stop_tracking_compaction) {

auto ret = fut.get0();
BOOST_REQUIRE(ret.new_sstables.size() == 1);
- BOOST_REQUIRE(ret.tracking == false);
}
// triggers code that iterates through registered compactions.
cf._data->cm.backlog();
--
2.31.1

Raphael S. Carvalho

unread,
Sep 20, 2021, 4:17:03 PMSep 20
to scylla...@googlegroups.com, Raphael S. Carvalho
Currently the following can happen:
1) there's ongoing compaction with input sstable A, so sstable set
and backlog tracker both contains A.
2) ongoing compaction replaces input sstable A by B, so sstable set
contains only B now.
3) schema is updated, so a new backlog tracker is built without A
because sstable set now contains only B.
4) ongoing compaction tries to remove A from tracker, but it was
excluded in step 3.
5) tracker can now have a negative value if table is decreasing in
size, which leads to log(<negative number>) == -NaN

This problem happens because backlog tracker updates are decoupled
from sstable set updates. Given that the essential content of
backlog tracker should be the same as one of sstable set, let's move
tracker management to table.
Whenever sstable set is updated, backlog tracker will be updated with
the same changes, making their management less error prone.

Fixes #9157

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>
---
database.hh | 2 ++
compaction/compaction.cc | 50 +---------------------------------------
table.cc | 13 +++++++++++
3 files changed, 16 insertions(+), 49 deletions(-)

diff --git a/database.hh b/database.hh
index 898df400f..1e378bacf 100644
--- a/database.hh
+++ b/database.hh
@@ -550,6 +550,8 @@ class table : public enable_lw_shared_from_this<table> {
void add_maintenance_sstable(sstables::shared_sstable sst);
static void add_sstable_to_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable);
static void remove_sstable_from_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable);
+ // Update compaction backlog tracker with the same changes applied to the underlying sstable set.
+ void backlog_tracker_adjust_charges(const std::vector<sstables::shared_sstable>& old_sstables, const std::vector<sstables::shared_sstable>& new_sstables);
lw_shared_ptr<memtable> new_memtable();
future<stop_iteration> try_flush_memtable_to_sstable(lw_shared_ptr<memtable> memt, sstable_write_permit&& permit);
// Caller must keep m alive.
diff --git a/compaction/compaction.cc b/compaction/compaction.cc
index 851f001e0..0ac95acc0 100644
--- a/compaction/compaction.cc
+++ b/compaction/compaction.cc
@@ -344,13 +344,6 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
return _last_position_seen;
}

- void remove_sstable() {
- if (_sst) {
- _cf.get_compaction_strategy().get_backlog_tracker().remove_sstable(_sst);
- }
- _sst = {};
- }
-
compaction_read_monitor(sstables::shared_sstable sst, column_family &cf)
: _sst(std::move(sst)), _cf(cf) { }

@@ -373,17 +366,10 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
explicit compaction_read_monitor_generator(column_family& cf)
: _cf(cf) {}

- void remove_sstables() {
- for (auto& rm : _generated_monitors | boost::adaptors::map_values) {
- rm.remove_sstable();
- }
- }
-
void remove_exhausted_sstables(const std::vector<sstables::shared_sstable>& exhausted_sstables) {
for (auto &sst : exhausted_sstables) {
auto it = _generated_monitors.find(sst);
if (it != _generated_monitors.end()) {
- it->second.remove_sstable();
_generated_monitors.erase(it);
}
}
@@ -750,8 +736,6 @@ class compaction {
std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(), pretty_printed_throughput(_info->end_size, duration),
_info->total_partitions, _info->total_keys_written);

- backlog_tracker_adjust_charges();
-
auto info = std::move(_info);
_cf.get_compaction_manager().deregister_compaction(info);
return std::move(*info);
@@ -759,7 +743,6 @@ class compaction {

virtual std::string_view report_start_desc() const = 0;
virtual std::string_view report_finish_desc() const = 0;
- virtual void backlog_tracker_adjust_charges() { };

std::function<api::timestamp_type(const dht::decorated_key&)> max_purgeable_func() {
if (!tombstone_expiration_enabled()) {
@@ -961,7 +944,6 @@ class reshape_compaction : public compaction {
class regular_compaction : public compaction {
// sstable being currently written.
mutable compaction_read_monitor_generator _monitor_generator;
- std::vector<shared_sstable> _unused_sstables = {};
public:
regular_compaction(column_family& cf, compaction_descriptor descriptor)
: compaction(cf, std::move(descriptor))
@@ -989,19 +971,9 @@ class regular_compaction : public compaction {
return "Compacted";
}

- void backlog_tracker_adjust_charges() override {
- _monitor_generator.remove_sstables();
- auto& tracker = _cf.get_compaction_strategy().get_backlog_tracker();
- for (auto& sst : _unused_sstables) {
- tracker.add_sstable(sst);
- }
- _unused_sstables.clear();
- }
-
virtual compaction_writer create_compaction_writer(const dht::decorated_key& dk) override {
auto sst = _sstable_creator(this_shard_id());
setup_new_sstable(sst);
- _unused_sstables.push_back(sst);

auto monitor = std::make_unique<compaction_write_monitor>(sst, _cf, maximum_timestamp(), _sstable_level);
sstable_writer_config cfg = make_sstable_writer_config(_info->type);
@@ -1030,24 +1002,6 @@ class regular_compaction : public compaction {
_monitor_generator(std::move(sstable));
}
private:
- void backlog_tracker_incrementally_adjust_charges(std::vector<shared_sstable> exhausted_sstables) {
- //
- // Notify backlog tracker of an early sstable replacement triggered by incremental compaction approach.
- // Backlog tracker will be told that the exhausted sstables aren't being compacted anymore, and the
- // new sstables, which replaced the exhausted ones, are not partially written sstables and they can
- // be added to tracker like any other regular sstable in the table's set.
- // This way we prevent bogus calculation of backlog due to lack of charge adjustment whenever there's
- // an early sstable replacement.
- //
-
- _monitor_generator.remove_exhausted_sstables(exhausted_sstables);
- auto& tracker = _cf.get_compaction_strategy().get_backlog_tracker();
- for (auto& sst : _unused_sstables) {
- tracker.add_sstable(sst);
- }
- _unused_sstables.clear();
- }
-
void maybe_replace_exhausted_sstables_by_sst(shared_sstable sst) {
// Skip earlier replacement of exhausted sstables if compaction works with only single-fragment runs,
// meaning incremental compaction is disabled for this compaction.
@@ -1080,7 +1034,7 @@ class regular_compaction : public compaction {
auto exhausted_ssts = std::vector<shared_sstable>(exhausted, _sstables.end());
_replacer(get_compaction_completion_desc(exhausted_ssts, std::move(_new_unused_sstables)));
_sstables.erase(exhausted, _sstables.end());
- backlog_tracker_incrementally_adjust_charges(std::move(exhausted_ssts));
+ _monitor_generator.remove_exhausted_sstables(exhausted_ssts);
}
}

@@ -1561,8 +1515,6 @@ class resharding_compaction final : public compaction {
return "Resharded";
}

- void backlog_tracker_adjust_charges() override { }
-
compaction_writer create_compaction_writer(const dht::decorated_key& dk) override {
auto shard = dht::shard_of(*_schema, dk.token());
auto sst = _sstable_creator(shard);
diff --git a/table.cc b/table.cc
index 1df7aa272..9e7341721 100644
--- a/table.cc
+++ b/table.cc
@@ -336,6 +336,16 @@ inline void table::remove_sstable_from_backlog_tracker(compaction_backlog_tracke
tracker.remove_sstable(std::move(sstable));
}

+void table::backlog_tracker_adjust_charges(const std::vector<sstables::shared_sstable>& old_sstables, const std::vector<sstables::shared_sstable>& new_sstables) {
+ auto& tracker = _compaction_strategy.get_backlog_tracker();
+ for (auto& sst : new_sstables) {
+ tracker.add_sstable(sst);
+ }
+ for (auto& sst : old_sstables) {
+ tracker.remove_sstable(sst);
+ }
+}
+
lw_shared_ptr<sstables::sstable_set>
table::do_add_sstable(lw_shared_ptr<sstables::sstable_set> sstables, sstables::shared_sstable sstable,
enable_backlog_tracker backlog_tracker) {
@@ -783,6 +793,8 @@ table::update_sstable_lists_on_off_strategy_completion(const std::vector<sstable
_t._main_sstables = std::move(_new_main_list);
_t._maintenance_sstables = std::move(_new_maintenance_list);
_t.refresh_compound_sstable_set();
+ // Input sstables aren't not removed from backlog tracker because they come from the maintenance set.
+ _t.backlog_tracker_adjust_charges({}, _new_main);
}
static std::unique_ptr<row_cache::external_updater_impl> make(table& t, sstable_list_builder::permit_t permit, const sstables_t& old_maintenance, const sstables_t& new_main) {
return std::make_unique<sstable_lists_updater>(t, std::move(permit), old_maintenance, new_main);
@@ -854,6 +866,7 @@ table::on_compaction_completion(sstables::compaction_completion_desc& desc) {

Benny Halevy

unread,
Sep 21, 2021, 2:09:55 AMSep 21
to Raphael S. Carvalho, scylla...@googlegroups.com
LGTM

Commit Bot

unread,
Sep 22, 2021, 8:08:26 AMSep 22
to scylla...@googlegroups.com, Raphael S. Carvalho
From: Raphael S. Carvalho <raph...@scylladb.com>
Committer: Raphael S. Carvalho <raph...@scylladb.com>
Branch: next

compaction: simplify removal of monitors

by switching to unordered_map, removal of generated monitors is
made easier. this is a preparatory change for patch which will
remove monitor for all exhausted sstables

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>

---
diff --git a/compaction/compaction.cc b/compaction/compaction.cc
--- a/compaction/compaction.cc
+++ b/compaction/compaction.cc
@@ -368,30 +368,29 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
};

virtual sstables::read_monitor& operator()(sstables::shared_sstable sst) override {
- _generated_monitors.emplace_back(std::move(sst), _cf);
- return _generated_monitors.back();
+ auto p = _generated_monitors.emplace(sst, compaction_read_monitor(sst, _cf));
+ return p.first->second;
}

explicit compaction_read_monitor_generator(column_family& cf)
: _cf(cf) {}

void remove_sstables(bool is_tracking) {
- for (auto& rm : _generated_monitors) {
+ for (auto& rm : _generated_monitors | boost::adaptors::map_values) {
rm.remove_sstable(is_tracking);
}
}

void remove_sstable(bool is_tracking, sstables::shared_sstable& sst) {
- for (auto& rm : _generated_monitors) {
- if (rm._sst == sst) {
- rm.remove_sstable(is_tracking);
- break;
- }
+ auto it = _generated_monitors.find(sst);
+ if (it != _generated_monitors.end()) {
+ it->second.remove_sstable(is_tracking);
+ _generated_monitors.erase(it);
}
}

Commit Bot

unread,
Sep 22, 2021, 8:08:27 AMSep 22
to scylla...@googlegroups.com, Raphael S. Carvalho
From: Raphael S. Carvalho <raph...@scylladb.com>
Committer: Raphael S. Carvalho <raph...@scylladb.com>
Branch: next

compaction: introduce compaction_read_monitor_generator::remove_exhausted_sstables()

This new function makes it easier to remove monitor of exhausted
sstables.

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>

---
diff --git a/compaction/compaction.cc b/compaction/compaction.cc
--- a/compaction/compaction.cc
+++ b/compaction/compaction.cc
@@ -381,11 +381,13 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
}
}

- void remove_sstable(bool is_tracking, sstables::shared_sstable& sst) {
- auto it = _generated_monitors.find(sst);
- if (it != _generated_monitors.end()) {
- it->second.remove_sstable(is_tracking);
- _generated_monitors.erase(it);
+ void remove_exhausted_sstables(bool is_tracking, const std::vector<sstables::shared_sstable>& exhausted_sstables) {
+ for (auto &sst : exhausted_sstables) {
+ auto it = _generated_monitors.find(sst);
+ if (it != _generated_monitors.end()) {
+ it->second.remove_sstable(is_tracking);
+ _generated_monitors.erase(it);
+ }
}
}
private:
@@ -1040,9 +1042,7 @@ class regular_compaction : public compaction {
// an early sstable replacement.
//

- for (auto& sst : exhausted_sstables) {
- _monitor_generator.remove_sstable(_info->tracking, sst);
- }
+ _monitor_generator.remove_exhausted_sstables(_info->tracking, exhausted_sstables);

Commit Bot

unread,
Sep 22, 2021, 8:08:29 AMSep 22
to scylla...@googlegroups.com, Raphael S. Carvalho
From: Raphael S. Carvalho <raph...@scylladb.com>
Committer: Raphael S. Carvalho <raph...@scylladb.com>
Branch: next

compaction: Don't leak backlog of input sstable when compaction strategy is changed

The generic back formula is: ALL + PARTIAL - COMPACTING

With transfer_ongoing_charges() we already ignore the effect of
ongoing compactions on COMPACTING as we judge them to be pointless.

But ongoing compactions will run to completion, meaning that output
sstables will be added to ALL anyway, in the formula above.

With stop_tracking_ongoing_compactions(), input sstables are never
removed from the tracker, but output sstables are added, which means
we end up with duplicate backlog in the tracker.

By removing this tracking mechanism, pointless ongoing compaction
will be ignored as expected and the leaks will be fixed.

Later, the intention is to force a stop on ongoing compactions if
strategy has changed as they're pointless anyway.

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>

---
diff --git a/compaction/compaction.cc b/compaction/compaction.cc
--- a/compaction/compaction.cc
+++ b/compaction/compaction.cc
@@ -344,11 +344,9 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
return _last_position_seen;
}

- void remove_sstable(bool is_tracking) {
- if (is_tracking && _sst) {
+ void remove_sstable() {
+ if (_sst) {
_cf.get_compaction_strategy().get_backlog_tracker().remove_sstable(_sst);
- } else if (_sst) {
- _cf.get_compaction_strategy().get_backlog_tracker().revert_charges(_sst);
}
_sst = {};
}
@@ -375,17 +373,17 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
explicit compaction_read_monitor_generator(column_family& cf)
: _cf(cf) {}

- void remove_sstables(bool is_tracking) {
+ void remove_sstables() {
for (auto& rm : _generated_monitors | boost::adaptors::map_values) {
- rm.remove_sstable(is_tracking);
+ rm.remove_sstable();
}
}

- void remove_exhausted_sstables(bool is_tracking, const std::vector<sstables::shared_sstable>& exhausted_sstables) {
+ void remove_exhausted_sstables(const std::vector<sstables::shared_sstable>& exhausted_sstables) {
for (auto &sst : exhausted_sstables) {
auto it = _generated_monitors.find(sst);
if (it != _generated_monitors.end()) {
- it->second.remove_sstable(is_tracking);
+ it->second.remove_sstable();
_generated_monitors.erase(it);
}
}
@@ -992,7 +990,7 @@ class regular_compaction : public compaction {
}

void backlog_tracker_adjust_charges() override {
- _monitor_generator.remove_sstables(_info->tracking);
+ _monitor_generator.remove_sstables();
auto& tracker = _cf.get_compaction_strategy().get_backlog_tracker();
for (auto& sst : _unused_sstables) {
tracker.add_sstable(sst);
@@ -1042,7 +1040,7 @@ class regular_compaction : public compaction {
// an early sstable replacement.
//

- _monitor_generator.remove_exhausted_sstables(_info->tracking, exhausted_sstables);
+ _monitor_generator.remove_exhausted_sstables(exhausted_sstables);
auto& tracker = _cf.get_compaction_strategy().get_backlog_tracker();
for (auto& sst : _unused_sstables) {
tracker.add_sstable(sst);
diff --git a/compaction/compaction.hh b/compaction/compaction.hh
--- a/compaction/compaction.hh
+++ b/compaction/compaction.hh
@@ -66,7 +66,6 @@ namespace sstables {
int64_t ended_at;
std::vector<shared_sstable> new_sstables;
sstring stop_requested;
- bool tracking = true;
utils::UUID run_identifier;
utils::UUID compaction_uuid;
struct replacement {
@@ -82,10 +81,6 @@ namespace sstables {
void stop(sstring reason) {
stop_requested = std::move(reason);
}
-
- void stop_tracking() {
- tracking = false;
- }
};

// Compact a list of N sstables into M sstables.
diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc
--- a/compaction/compaction_manager.cc
+++ b/compaction/compaction_manager.cc
@@ -940,14 +940,6 @@ future<> compaction_manager::remove(column_family* cf) {
});
}

-void compaction_manager::stop_tracking_ongoing_compactions(column_family* cf) {
- for (auto& info : _compactions) {
- if (info->cf == cf) {
- info->stop_tracking();
- }
- }
-}
-
void compaction_manager::stop_compaction(sstring type) {
sstables::compaction_type target_type;
try {
diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh
--- a/compaction/compaction_manager.hh
+++ b/compaction/compaction_manager.hh
@@ -241,10 +241,6 @@ public:
// Cancel requests on cf and wait for a possible ongoing compaction on cf.
future<> remove(column_family* cf);

- // No longer interested in tracking backlog for compactions in this column
- // family. For instance, we could be ALTERing TABLE to a different strategy.
- void stop_tracking_ongoing_compactions(column_family* cf);
-
const stats& get_stats() const {
return _stats;
}
diff --git a/table.cc b/table.cc
--- a/table.cc
+++ b/table.cc
@@ -1036,10 +1036,6 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy)
new_sstables.insert(s);
});

- if (!move_read_charges) {
- _compaction_manager.stop_tracking_ongoing_compactions(this);
- }
-
// now exception safe:
_compaction_strategy = std::move(new_cs);
_main_sstables = std::move(new_sstables);
diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc
--- a/test/boost/sstable_compaction_test.cc
+++ b/test/boost/sstable_compaction_test.cc
@@ -2955,11 +2955,11 @@ SEASTAR_TEST_CASE(compaction_strategy_aware_major_compaction_test) {
});
}

-SEASTAR_TEST_CASE(backlog_tracker_correctness_after_stop_tracking_compaction) {
+SEASTAR_TEST_CASE(backlog_tracker_correctness_after_changing_compaction_strategy) {
return test_env::do_with_async([] (test_env& env) {
cell_locker_stats cl_stats;

- auto builder = schema_builder("tests", "backlog_correctness_after_stop_tracking_compaction")
+ auto builder = schema_builder("tests", "backlog_tracker_correctness_after_changing_compaction_strategy")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("value", int32_type);
auto s = builder.build();
@@ -3001,23 +3001,14 @@ SEASTAR_TEST_CASE(backlog_tracker_correctness_after_stop_tracking_compaction) {

auto fut = compact_sstables(sstables::compaction_descriptor(ssts, cf->get_sstable_set(), default_priority_class()), *cf, sst_gen);

- bool stopped_tracking = false;
- for (auto& info : cf._data->cm.get_compactions()) {
- if (info->cf == &*cf) {
- info->stop_tracking();
- stopped_tracking = true;
- }
- }
- BOOST_REQUIRE(stopped_tracking);
-
+ // set_compaction_strategy() itself is responsible for transferring charges from old to new backlog tracker.
cf->set_compaction_strategy(sstables::compaction_strategy_type::time_window);
for (auto& sst : ssts) {
cf->get_compaction_strategy().get_backlog_tracker().add_sstable(sst);
}

Commit Bot

unread,
Sep 22, 2021, 8:08:30 AMSep 22
to scylla...@googlegroups.com, Raphael S. Carvalho
From: Raphael S. Carvalho <raph...@scylladb.com>
Committer: Raphael S. Carvalho <raph...@scylladb.com>
Branch: next

compaction: Update backlog tracker correctly when schema is updated

Currently the following can happen:
1) there's ongoing compaction with input sstable A, so sstable set
and backlog tracker both contains A.
2) ongoing compaction replaces input sstable A by B, so sstable set
contains only B now.
3) schema is updated, so a new backlog tracker is built without A
because sstable set now contains only B.
4) ongoing compaction tries to remove A from tracker, but it was
excluded in step 3.
5) tracker can now have a negative value if table is decreasing in
size, which leads to log(<negative number>) == -NaN

This problem happens because backlog tracker updates are decoupled
from sstable set updates. Given that the essential content of
backlog tracker should be the same as one of sstable set, let's move
tracker management to table.
Whenever sstable set is updated, backlog tracker will be updated with
the same changes, making their management less error prone.

Fixes #9157

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>

---
diff --git a/compaction/compaction.cc b/compaction/compaction.cc
--- a/compaction/compaction.cc
+++ b/compaction/compaction.cc
@@ -344,13 +344,6 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
return _last_position_seen;
}

- void remove_sstable() {
- if (_sst) {
- _cf.get_compaction_strategy().get_backlog_tracker().remove_sstable(_sst);
- }
- _sst = {};
- }
-
compaction_read_monitor(sstables::shared_sstable sst, column_family &cf)
: _sst(std::move(sst)), _cf(cf) { }

@@ -373,17 +366,10 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
explicit compaction_read_monitor_generator(column_family& cf)
: _cf(cf) {}

- void remove_sstables() {
- for (auto& rm : _generated_monitors | boost::adaptors::map_values) {
- rm.remove_sstable();
- }
- }
-
void remove_exhausted_sstables(const std::vector<sstables::shared_sstable>& exhausted_sstables) {
for (auto &sst : exhausted_sstables) {
auto it = _generated_monitors.find(sst);
if (it != _generated_monitors.end()) {
- it->second.remove_sstable();
_generated_monitors.erase(it);
}
}
@@ -750,16 +736,13 @@ class compaction {
std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(), pretty_printed_throughput(_info->end_size, duration),
_info->total_partitions, _info->total_keys_written);

- backlog_tracker_adjust_charges();
-
auto info = std::move(_info);
_cf.get_compaction_manager().deregister_compaction(info);
return std::move(*info);
}

virtual std::string_view report_start_desc() const = 0;
virtual std::string_view report_finish_desc() const = 0;
- virtual void backlog_tracker_adjust_charges() { };

std::function<api::timestamp_type(const dht::decorated_key&)> max_purgeable_func() {
if (!tombstone_expiration_enabled()) {
@@ -961,7 +944,6 @@ class reshape_compaction : public compaction {
class regular_compaction : public compaction {
diff --git a/database.hh b/database.hh
--- a/database.hh
+++ b/database.hh
@@ -550,6 +550,8 @@ private:
void add_maintenance_sstable(sstables::shared_sstable sst);
static void add_sstable_to_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable);
static void remove_sstable_from_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable);
+ // Update compaction backlog tracker with the same changes applied to the underlying sstable set.
+ void backlog_tracker_adjust_charges(const std::vector<sstables::shared_sstable>& old_sstables, const std::vector<sstables::shared_sstable>& new_sstables);
lw_shared_ptr<memtable> new_memtable();
future<stop_iteration> try_flush_memtable_to_sstable(lw_shared_ptr<memtable> memt, sstable_write_permit&& permit);
// Caller must keep m alive.
diff --git a/table.cc b/table.cc
--- a/table.cc
+++ b/table.cc

Commit Bot

unread,
Sep 22, 2021, 6:45:17 PMSep 22
to scylla...@googlegroups.com, Raphael S. Carvalho
From: Raphael S. Carvalho <raph...@scylladb.com>
Committer: Raphael S. Carvalho <raph...@scylladb.com>
Branch: master

compaction: simplify removal of monitors

by switching to unordered_map, removal of generated monitors is
made easier. this is a preparatory change for patch which will
remove monitor for all exhausted sstables

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>

---
diff --git a/compaction/compaction.cc b/compaction/compaction.cc
--- a/compaction/compaction.cc
+++ b/compaction/compaction.cc
@@ -368,30 +368,29 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
};

virtual sstables::read_monitor& operator()(sstables::shared_sstable sst) override {
- _generated_monitors.emplace_back(std::move(sst), _cf);
- return _generated_monitors.back();
+ auto p = _generated_monitors.emplace(sst, compaction_read_monitor(sst, _cf));
+ return p.first->second;
}

explicit compaction_read_monitor_generator(column_family& cf)
: _cf(cf) {}

void remove_sstables(bool is_tracking) {
- for (auto& rm : _generated_monitors) {
+ for (auto& rm : _generated_monitors | boost::adaptors::map_values) {
rm.remove_sstable(is_tracking);
}
}

void remove_sstable(bool is_tracking, sstables::shared_sstable& sst) {
- for (auto& rm : _generated_monitors) {
- if (rm._sst == sst) {
- rm.remove_sstable(is_tracking);
- break;
- }
+ auto it = _generated_monitors.find(sst);
+ if (it != _generated_monitors.end()) {
+ it->second.remove_sstable(is_tracking);
+ _generated_monitors.erase(it);
}
}

Commit Bot

unread,
Sep 22, 2021, 6:45:18 PMSep 22
to scylla...@googlegroups.com, Raphael S. Carvalho
From: Raphael S. Carvalho <raph...@scylladb.com>
Committer: Raphael S. Carvalho <raph...@scylladb.com>
Branch: master

compaction: introduce compaction_read_monitor_generator::remove_exhausted_sstables()

This new function makes it easier to remove monitor of exhausted
sstables.

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>

---
diff --git a/compaction/compaction.cc b/compaction/compaction.cc
--- a/compaction/compaction.cc
+++ b/compaction/compaction.cc
@@ -381,11 +381,13 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
}
}

- void remove_sstable(bool is_tracking, sstables::shared_sstable& sst) {
- auto it = _generated_monitors.find(sst);
- if (it != _generated_monitors.end()) {
- it->second.remove_sstable(is_tracking);
- _generated_monitors.erase(it);
+ void remove_exhausted_sstables(bool is_tracking, const std::vector<sstables::shared_sstable>& exhausted_sstables) {
+ for (auto &sst : exhausted_sstables) {
+ auto it = _generated_monitors.find(sst);
+ if (it != _generated_monitors.end()) {
+ it->second.remove_sstable(is_tracking);
+ _generated_monitors.erase(it);
+ }
}
}
private:
@@ -1040,9 +1042,7 @@ class regular_compaction : public compaction {
// an early sstable replacement.
//

- for (auto& sst : exhausted_sstables) {
- _monitor_generator.remove_sstable(_info->tracking, sst);
- }
+ _monitor_generator.remove_exhausted_sstables(_info->tracking, exhausted_sstables);

Commit Bot

unread,
Sep 22, 2021, 6:45:19 PMSep 22
to scylla...@googlegroups.com, Raphael S. Carvalho
From: Raphael S. Carvalho <raph...@scylladb.com>
Committer: Raphael S. Carvalho <raph...@scylladb.com>
Branch: master

compaction: Don't leak backlog of input sstable when compaction strategy is changed

The generic back formula is: ALL + PARTIAL - COMPACTING

With transfer_ongoing_charges() we already ignore the effect of
ongoing compactions on COMPACTING as we judge them to be pointless.

But ongoing compactions will run to completion, meaning that output
sstables will be added to ALL anyway, in the formula above.

With stop_tracking_ongoing_compactions(), input sstables are never
removed from the tracker, but output sstables are added, which means
we end up with duplicate backlog in the tracker.

By removing this tracking mechanism, pointless ongoing compaction
will be ignored as expected and the leaks will be fixed.

Later, the intention is to force a stop on ongoing compactions if
strategy has changed as they're pointless anyway.

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>

---
diff --git a/compaction/compaction.cc b/compaction/compaction.cc
--- a/compaction/compaction.cc
+++ b/compaction/compaction.cc
@@ -344,11 +344,9 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
return _last_position_seen;
}

- void remove_sstable(bool is_tracking) {
- if (is_tracking && _sst) {
+ void remove_sstable() {
+ if (_sst) {
_cf.get_compaction_strategy().get_backlog_tracker().remove_sstable(_sst);
- } else if (_sst) {
- _cf.get_compaction_strategy().get_backlog_tracker().revert_charges(_sst);
}
_sst = {};
}
@@ -375,17 +373,17 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
explicit compaction_read_monitor_generator(column_family& cf)
: _cf(cf) {}

- void remove_sstables(bool is_tracking) {
+ void remove_sstables() {
for (auto& rm : _generated_monitors | boost::adaptors::map_values) {
- rm.remove_sstable(is_tracking);
+ rm.remove_sstable();
}
}

- void remove_exhausted_sstables(bool is_tracking, const std::vector<sstables::shared_sstable>& exhausted_sstables) {
+ void remove_exhausted_sstables(const std::vector<sstables::shared_sstable>& exhausted_sstables) {
for (auto &sst : exhausted_sstables) {
auto it = _generated_monitors.find(sst);
if (it != _generated_monitors.end()) {
- it->second.remove_sstable(is_tracking);
+ it->second.remove_sstable();
_generated_monitors.erase(it);
}
}
@@ -992,7 +990,7 @@ class regular_compaction : public compaction {
}

void backlog_tracker_adjust_charges() override {
- _monitor_generator.remove_sstables(_info->tracking);
+ _monitor_generator.remove_sstables();
auto& tracker = _cf.get_compaction_strategy().get_backlog_tracker();
for (auto& sst : _unused_sstables) {
tracker.add_sstable(sst);
@@ -1042,7 +1040,7 @@ class regular_compaction : public compaction {
// an early sstable replacement.
//

- _monitor_generator.remove_exhausted_sstables(_info->tracking, exhausted_sstables);
+ _monitor_generator.remove_exhausted_sstables(exhausted_sstables);
auto& tracker = _cf.get_compaction_strategy().get_backlog_tracker();
for (auto& sst : _unused_sstables) {
tracker.add_sstable(sst);
diff --git a/table.cc b/table.cc
--- a/table.cc
+++ b/table.cc

Commit Bot

unread,
Sep 22, 2021, 6:45:21 PMSep 22
to scylla...@googlegroups.com, Raphael S. Carvalho
From: Raphael S. Carvalho <raph...@scylladb.com>
Committer: Raphael S. Carvalho <raph...@scylladb.com>
Branch: master

compaction: Update backlog tracker correctly when schema is updated

Currently the following can happen:
1) there's ongoing compaction with input sstable A, so sstable set
and backlog tracker both contains A.
2) ongoing compaction replaces input sstable A by B, so sstable set
contains only B now.
3) schema is updated, so a new backlog tracker is built without A
because sstable set now contains only B.
4) ongoing compaction tries to remove A from tracker, but it was
excluded in step 3.
5) tracker can now have a negative value if table is decreasing in
size, which leads to log(<negative number>) == -NaN

This problem happens because backlog tracker updates are decoupled
from sstable set updates. Given that the essential content of
backlog tracker should be the same as one of sstable set, let's move
tracker management to table.
Whenever sstable set is updated, backlog tracker will be updated with
the same changes, making their management less error prone.

Fixes #9157

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>

---
diff --git a/compaction/compaction.cc b/compaction/compaction.cc
--- a/compaction/compaction.cc
+++ b/compaction/compaction.cc
@@ -344,13 +344,6 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
return _last_position_seen;
}

- void remove_sstable() {
- if (_sst) {
- _cf.get_compaction_strategy().get_backlog_tracker().remove_sstable(_sst);
- }
- _sst = {};
- }
-
compaction_read_monitor(sstables::shared_sstable sst, column_family &cf)
: _sst(std::move(sst)), _cf(cf) { }

@@ -373,17 +366,10 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
explicit compaction_read_monitor_generator(column_family& cf)
: _cf(cf) {}

- void remove_sstables() {
- for (auto& rm : _generated_monitors | boost::adaptors::map_values) {
- rm.remove_sstable();
- }
- }
-
void remove_exhausted_sstables(const std::vector<sstables::shared_sstable>& exhausted_sstables) {
for (auto &sst : exhausted_sstables) {
auto it = _generated_monitors.find(sst);
if (it != _generated_monitors.end()) {
- it->second.remove_sstable();
_generated_monitors.erase(it);
}
}
@@ -750,16 +736,13 @@ class compaction {
std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(), pretty_printed_throughput(_info->end_size, duration),
_info->total_partitions, _info->total_keys_written);

- backlog_tracker_adjust_charges();
-
auto info = std::move(_info);
_cf.get_compaction_manager().deregister_compaction(info);
return std::move(*info);
}

virtual std::string_view report_start_desc() const = 0;
virtual std::string_view report_finish_desc() const = 0;
- virtual void backlog_tracker_adjust_charges() { };

std::function<api::timestamp_type(const dht::decorated_key&)> max_purgeable_func() {
if (!tombstone_expiration_enabled()) {
@@ -961,7 +944,6 @@ class reshape_compaction : public compaction {
class regular_compaction : public compaction {
diff --git a/table.cc b/table.cc
--- a/table.cc
+++ b/table.cc

Raphael S. Carvalho

unread,
Sep 27, 2021, 2:06:13 PMSep 27
to scylladb-dev, Avi Kivity
please find v3 fixed here: github.com/raphaelsc fixes_to_backlog_tracker_v4

there's only a minor change (please see last patch) which makes
monitors outlive their respective readers, avoiding the seen
use-after-free.

tests: mode(debug), manual test which reproduced use-after-free in v3.

Raphael S. Carvalho

unread,
Sep 29, 2021, 6:53:13 AMSep 29
to scylladb-dev, Avi Kivity
Ping

Commit Bot

unread,
Sep 29, 2021, 6:56:11 AMSep 29
to scylla...@googlegroups.com, Raphael S. Carvalho
From: Raphael S. Carvalho <raph...@scylladb.com>
Committer: Raphael S. Carvalho <raph...@scylladb.com>
Branch: next

compaction: simplify removal of monitors

by switching to unordered_map, removal of generated monitors is
made easier. this is a preparatory change for patch which will
remove monitor for all exhausted sstables

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>

---
diff --git a/compaction/compaction.cc b/compaction/compaction.cc
--- a/compaction/compaction.cc
+++ b/compaction/compaction.cc
@@ -368,30 +368,28 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
};

virtual sstables::read_monitor& operator()(sstables::shared_sstable sst) override {
- _generated_monitors.emplace_back(std::move(sst), _cf);
- return _generated_monitors.back();
+ auto p = _generated_monitors.emplace(sst->generation(), compaction_read_monitor(sst, _cf));
+ return p.first->second;
}

explicit compaction_read_monitor_generator(column_family& cf)
: _cf(cf) {}

void remove_sstables(bool is_tracking) {
- for (auto& rm : _generated_monitors) {
+ for (auto& rm : _generated_monitors | boost::adaptors::map_values) {
rm.remove_sstable(is_tracking);
}
}

void remove_sstable(bool is_tracking, sstables::shared_sstable& sst) {
- for (auto& rm : _generated_monitors) {
- if (rm._sst == sst) {
- rm.remove_sstable(is_tracking);
- break;
- }
+ auto it = _generated_monitors.find(sst->generation());
+ if (it != _generated_monitors.end()) {
+ it->second.remove_sstable(is_tracking);
}
}
private:
column_family& _cf;
- std::deque<compaction_read_monitor> _generated_monitors;
+ std::unordered_map<int64_t, compaction_read_monitor> _generated_monitors;

Commit Bot

unread,
Sep 29, 2021, 6:56:12 AMSep 29
to scylla...@googlegroups.com, Raphael S. Carvalho
From: Raphael S. Carvalho <raph...@scylladb.com>
Committer: Raphael S. Carvalho <raph...@scylladb.com>
Branch: next

compaction: introduce compaction_read_monitor_generator::remove_exhausted_sstables()

This new function makes it easier to remove monitor of exhausted
sstables.

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>

---
diff --git a/compaction/compaction.cc b/compaction/compaction.cc
--- a/compaction/compaction.cc
+++ b/compaction/compaction.cc
@@ -381,10 +381,12 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
}
}

- void remove_sstable(bool is_tracking, sstables::shared_sstable& sst) {
- auto it = _generated_monitors.find(sst->generation());
- if (it != _generated_monitors.end()) {
- it->second.remove_sstable(is_tracking);
+ void remove_exhausted_sstables(bool is_tracking, const std::vector<sstables::shared_sstable>& exhausted_sstables) {
+ for (auto& sst : exhausted_sstables) {
+ auto it = _generated_monitors.find(sst->generation());
+ if (it != _generated_monitors.end()) {
+ it->second.remove_sstable(is_tracking);
+ }
}
}
private:
@@ -1039,9 +1041,7 @@ class regular_compaction : public compaction {
// an early sstable replacement.
//

- for (auto& sst : exhausted_sstables) {
- _monitor_generator.remove_sstable(_info->tracking, sst);
- }
+ _monitor_generator.remove_exhausted_sstables(_info->tracking, exhausted_sstables);

Commit Bot

unread,
Sep 29, 2021, 6:56:13 AMSep 29
to scylla...@googlegroups.com, Raphael S. Carvalho
From: Raphael S. Carvalho <raph...@scylladb.com>
Committer: Raphael S. Carvalho <raph...@scylladb.com>
Branch: next

compaction: Don't leak backlog of input sstable when compaction strategy is changed

The generic backlog formula is: ALL + PARTIAL - COMPACTING

With transfer_ongoing_charges() we already ignore the effect of
ongoing compactions on COMPACTING as we judge them to be pointless.

But ongoing compactions will run to completion, meaning that output
sstables will be added to ALL anyway, in the formula above.

With stop_tracking_ongoing_compactions(), input sstables are never
removed from the tracker, but output sstables are added, which means
we end up with duplicate backlog in the tracker.

By removing this tracking mechanism, pointless ongoing compaction
will be ignored as expected and the leaks will be fixed.

Later, the intention is to force a stop on ongoing compactions if
strategy has changed as they're pointless anyway.

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>

---
diff --git a/compaction/compaction.cc b/compaction/compaction.cc
--- a/compaction/compaction.cc
+++ b/compaction/compaction.cc
@@ -344,11 +344,9 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
return _last_position_seen;
}

- void remove_sstable(bool is_tracking) {
- if (is_tracking && _sst) {
+ void remove_sstable() {
+ if (_sst) {
_cf.get_compaction_strategy().get_backlog_tracker().remove_sstable(_sst);
- } else if (_sst) {
- _cf.get_compaction_strategy().get_backlog_tracker().revert_charges(_sst);
}
_sst = {};
}
@@ -375,17 +373,17 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
explicit compaction_read_monitor_generator(column_family& cf)
: _cf(cf) {}

- void remove_sstables(bool is_tracking) {
+ void remove_sstables() {
for (auto& rm : _generated_monitors | boost::adaptors::map_values) {
- rm.remove_sstable(is_tracking);
+ rm.remove_sstable();
}
}

- void remove_exhausted_sstables(bool is_tracking, const std::vector<sstables::shared_sstable>& exhausted_sstables) {
+ void remove_exhausted_sstables(const std::vector<sstables::shared_sstable>& exhausted_sstables) {
for (auto& sst : exhausted_sstables) {
auto it = _generated_monitors.find(sst->generation());
if (it != _generated_monitors.end()) {
- it->second.remove_sstable(is_tracking);
+ it->second.remove_sstable();
}
}
}
@@ -991,7 +989,7 @@ class regular_compaction : public compaction {
}

void backlog_tracker_adjust_charges() override {
- _monitor_generator.remove_sstables(_info->tracking);
+ _monitor_generator.remove_sstables();
auto& tracker = _cf.get_compaction_strategy().get_backlog_tracker();
for (auto& sst : _unused_sstables) {
tracker.add_sstable(sst);
@@ -1041,7 +1039,7 @@ class regular_compaction : public compaction {
// an early sstable replacement.
//

- _monitor_generator.remove_exhausted_sstables(_info->tracking, exhausted_sstables);
+ _monitor_generator.remove_exhausted_sstables(exhausted_sstables);
auto& tracker = _cf.get_compaction_strategy().get_backlog_tracker();
for (auto& sst : _unused_sstables) {
tracker.add_sstable(sst);
diff --git a/table.cc b/table.cc
--- a/table.cc
+++ b/table.cc
@@ -1037,10 +1037,6 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy)
new_sstables.insert(s);
});

- if (!move_read_charges) {
- _compaction_manager.stop_tracking_ongoing_compactions(this);
- }
-
// now exception safe:
_compaction_strategy = std::move(new_cs);
_main_sstables = std::move(new_sstables);
diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc
--- a/test/boost/sstable_compaction_test.cc
+++ b/test/boost/sstable_compaction_test.cc
@@ -2945,11 +2945,11 @@ SEASTAR_TEST_CASE(compaction_strategy_aware_major_compaction_test) {
});
}

-SEASTAR_TEST_CASE(backlog_tracker_correctness_after_stop_tracking_compaction) {
+SEASTAR_TEST_CASE(backlog_tracker_correctness_after_changing_compaction_strategy) {
return test_env::do_with_async([] (test_env& env) {
cell_locker_stats cl_stats;

- auto builder = schema_builder("tests", "backlog_correctness_after_stop_tracking_compaction")
+ auto builder = schema_builder("tests", "backlog_tracker_correctness_after_changing_compaction_strategy")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("value", int32_type);
auto s = builder.build();
@@ -2991,23 +2991,14 @@ SEASTAR_TEST_CASE(backlog_tracker_correctness_after_stop_tracking_compaction) {

Commit Bot

unread,
Sep 29, 2021, 6:56:15 AMSep 29
to scylla...@googlegroups.com, Raphael S. Carvalho
From: Raphael S. Carvalho <raph...@scylladb.com>
Committer: Raphael S. Carvalho <raph...@scylladb.com>
Branch: next

compaction: Update backlog tracker correctly when schema is updated

Currently the following can happen:
1) there's ongoing compaction with input sstable A, so sstable set
and backlog tracker both contains A.
2) ongoing compaction replaces input sstable A by B, so sstable set
contains only B now.
3) schema is updated, so a new backlog tracker is built without A
because sstable set now contains only B.
4) ongoing compaction tries to remove A from tracker, but it was
excluded in step 3.
5) tracker can now have a negative value if table is decreasing in
size, which leads to log(<negative number>) == -NaN

This problem happens because backlog tracker updates are decoupled
from sstable set updates. Given that the essential content of
backlog tracker should be the same as one of sstable set, let's move
tracker management to table.
Whenever sstable set is updated, backlog tracker will be updated with
the same changes, making their management less error prone.

Fixes #9157

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>

---
diff --git a/compaction/compaction.cc b/compaction/compaction.cc
--- a/compaction/compaction.cc
+++ b/compaction/compaction.cc
@@ -346,7 +346,7 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {

void remove_sstable() {
if (_sst) {
- _cf.get_compaction_strategy().get_backlog_tracker().remove_sstable(_sst);
+ _cf.get_compaction_strategy().get_backlog_tracker().revert_charges(_sst);
}
_sst = {};
}
@@ -373,12 +373,6 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
explicit compaction_read_monitor_generator(column_family& cf)
: _cf(cf) {}

- void remove_sstables() {
- for (auto& rm : _generated_monitors | boost::adaptors::map_values) {
- rm.remove_sstable();
- }
- }
-
void remove_exhausted_sstables(const std::vector<sstables::shared_sstable>& exhausted_sstables) {
for (auto& sst : exhausted_sstables) {
auto it = _generated_monitors.find(sst->generation());
@@ -749,16 +743,13 @@ class compaction {
std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(), pretty_printed_throughput(_info->end_size, duration),
_info->total_partitions, _info->total_keys_written);

- backlog_tracker_adjust_charges();
-
auto info = std::move(_info);
_cf.get_compaction_manager().deregister_compaction(info);
return std::move(*info);
}

virtual std::string_view report_start_desc() const = 0;
virtual std::string_view report_finish_desc() const = 0;
- virtual void backlog_tracker_adjust_charges() { };

std::function<api::timestamp_type(const dht::decorated_key&)> max_purgeable_func() {
if (!tombstone_expiration_enabled()) {
@@ -960,7 +951,6 @@ class reshape_compaction : public compaction {
class regular_compaction : public compaction {
// sstable being currently written.
mutable compaction_read_monitor_generator _monitor_generator;
- std::vector<shared_sstable> _unused_sstables = {};
public:
regular_compaction(column_family& cf, compaction_descriptor descriptor)
: compaction(cf, std::move(descriptor))
@@ -988,19 +978,9 @@ class regular_compaction : public compaction {
return "Compacted";
}

- void backlog_tracker_adjust_charges() override {
- _monitor_generator.remove_sstables();
- auto& tracker = _cf.get_compaction_strategy().get_backlog_tracker();
- for (auto& sst : _unused_sstables) {
- tracker.add_sstable(sst);
- }
- _unused_sstables.clear();
- }
-
virtual compaction_writer create_compaction_writer(const dht::decorated_key& dk) override {
auto sst = _sstable_creator(this_shard_id());
setup_new_sstable(sst);
- _unused_sstables.push_back(sst);

auto monitor = std::make_unique<compaction_write_monitor>(sst, _cf, maximum_timestamp(), _sstable_level);
sstable_writer_config cfg = make_sstable_writer_config(_info->type);
@@ -1029,24 +1009,6 @@ class regular_compaction : public compaction {
_monitor_generator(std::move(sstable));
}
private:
- void backlog_tracker_incrementally_adjust_charges(std::vector<shared_sstable> exhausted_sstables) {
- //
- // Notify backlog tracker of an early sstable replacement triggered by incremental compaction approach.
- // Backlog tracker will be told that the exhausted sstables aren't being compacted anymore, and the
- // new sstables, which replaced the exhausted ones, are not partially written sstables and they can
- // be added to tracker like any other regular sstable in the table's set.
- // This way we prevent bogus calculation of backlog due to lack of charge adjustment whenever there's
- // an early sstable replacement.
- //
-
- _monitor_generator.remove_exhausted_sstables(exhausted_sstables);
- auto& tracker = _cf.get_compaction_strategy().get_backlog_tracker();
- for (auto& sst : _unused_sstables) {
- tracker.add_sstable(sst);
- }
- _unused_sstables.clear();
- }
-
void maybe_replace_exhausted_sstables_by_sst(shared_sstable sst) {
// Skip earlier replacement of exhausted sstables if compaction works with only single-fragment runs,
// meaning incremental compaction is disabled for this compaction.
@@ -1079,7 +1041,7 @@ class regular_compaction : public compaction {
auto exhausted_ssts = std::vector<shared_sstable>(exhausted, _sstables.end());
_replacer(get_compaction_completion_desc(exhausted_ssts, std::move(_new_unused_sstables)));
_sstables.erase(exhausted, _sstables.end());
- backlog_tracker_incrementally_adjust_charges(std::move(exhausted_ssts));
+ _monitor_generator.remove_exhausted_sstables(exhausted_ssts);
}
}

@@ -1560,8 +1522,6 @@ class resharding_compaction final : public compaction {
return "Resharded";
}

- void backlog_tracker_adjust_charges() override { }
-
compaction_writer create_compaction_writer(const dht::decorated_key& dk) override {
auto shard = dht::shard_of(*_schema, dk.token());
auto sst = _sstable_creator(shard);
diff --git a/database.hh b/database.hh
--- a/database.hh
+++ b/database.hh
@@ -546,6 +546,8 @@ private:
void add_maintenance_sstable(sstables::shared_sstable sst);
static void add_sstable_to_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable);
static void remove_sstable_from_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable);
+ // Update compaction backlog tracker with the same changes applied to the underlying sstable set.
+ void backlog_tracker_adjust_charges(const std::vector<sstables::shared_sstable>& old_sstables, const std::vector<sstables::shared_sstable>& new_sstables);
lw_shared_ptr<memtable> new_memtable();
future<stop_iteration> try_flush_memtable_to_sstable(lw_shared_ptr<memtable> memt, sstable_write_permit&& permit);
// Caller must keep m alive.
diff --git a/table.cc b/table.cc
--- a/table.cc
+++ b/table.cc
@@ -337,6 +337,16 @@ inline void table::remove_sstable_from_backlog_tracker(compaction_backlog_tracke
tracker.remove_sstable(std::move(sstable));
}

+void table::backlog_tracker_adjust_charges(const std::vector<sstables::shared_sstable>& old_sstables, const std::vector<sstables::shared_sstable>& new_sstables) {
+ auto& tracker = _compaction_strategy.get_backlog_tracker();
+ for (auto& sst : new_sstables) {
+ tracker.add_sstable(sst);
+ }
+ for (auto& sst : old_sstables) {
+ tracker.remove_sstable(sst);
+ }
+}
+
lw_shared_ptr<sstables::sstable_set>
table::do_add_sstable(lw_shared_ptr<sstables::sstable_set> sstables, sstables::shared_sstable sstable,
enable_backlog_tracker backlog_tracker) {
@@ -784,6 +794,8 @@ table::update_sstable_lists_on_off_strategy_completion(const std::vector<sstable
_t._main_sstables = std::move(_new_main_list);
_t._maintenance_sstables = std::move(_new_maintenance_list);
_t.refresh_compound_sstable_set();
+ // Input sstables aren't not removed from backlog tracker because they come from the maintenance set.
+ _t.backlog_tracker_adjust_charges({}, _new_main);
}
static std::unique_ptr<row_cache::external_updater_impl> make(table& t, sstable_list_builder::permit_t permit, const sstables_t& old_maintenance, const sstables_t& new_main) {
return std::make_unique<sstable_lists_updater>(t, std::move(permit), old_maintenance, new_main);
@@ -855,6 +867,7 @@ table::on_compaction_completion(sstables::compaction_completion_desc& desc) {

Commit Bot

unread,
Sep 29, 2021, 10:59:12 AMSep 29
to scylla...@googlegroups.com, Raphael S. Carvalho
From: Raphael S. Carvalho <raph...@scylladb.com>
Committer: Raphael S. Carvalho <raph...@scylladb.com>
Branch: master

compaction: simplify removal of monitors

by switching to unordered_map, removal of generated monitors is
made easier. this is a preparatory change for patch which will
remove monitor for all exhausted sstables

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>

---
diff --git a/compaction/compaction.cc b/compaction/compaction.cc
--- a/compaction/compaction.cc
+++ b/compaction/compaction.cc
@@ -368,30 +368,28 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
};

virtual sstables::read_monitor& operator()(sstables::shared_sstable sst) override {
- _generated_monitors.emplace_back(std::move(sst), _cf);
- return _generated_monitors.back();
+ auto p = _generated_monitors.emplace(sst->generation(), compaction_read_monitor(sst, _cf));
+ return p.first->second;
}

explicit compaction_read_monitor_generator(column_family& cf)
: _cf(cf) {}

void remove_sstables(bool is_tracking) {
- for (auto& rm : _generated_monitors) {
+ for (auto& rm : _generated_monitors | boost::adaptors::map_values) {
rm.remove_sstable(is_tracking);
}
}

void remove_sstable(bool is_tracking, sstables::shared_sstable& sst) {
- for (auto& rm : _generated_monitors) {
- if (rm._sst == sst) {
- rm.remove_sstable(is_tracking);
- break;
- }
+ auto it = _generated_monitors.find(sst->generation());
+ if (it != _generated_monitors.end()) {
+ it->second.remove_sstable(is_tracking);
}
}

Commit Bot

unread,
Sep 29, 2021, 10:59:13 AMSep 29
to scylla...@googlegroups.com, Raphael S. Carvalho
From: Raphael S. Carvalho <raph...@scylladb.com>
Committer: Raphael S. Carvalho <raph...@scylladb.com>
Branch: master

compaction: introduce compaction_read_monitor_generator::remove_exhausted_sstables()

This new function makes it easier to remove monitor of exhausted
sstables.

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>

---
diff --git a/compaction/compaction.cc b/compaction/compaction.cc
--- a/compaction/compaction.cc
+++ b/compaction/compaction.cc
@@ -381,10 +381,12 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
}
}

- void remove_sstable(bool is_tracking, sstables::shared_sstable& sst) {
- auto it = _generated_monitors.find(sst->generation());
- if (it != _generated_monitors.end()) {
- it->second.remove_sstable(is_tracking);
+ void remove_exhausted_sstables(bool is_tracking, const std::vector<sstables::shared_sstable>& exhausted_sstables) {
+ for (auto& sst : exhausted_sstables) {
+ auto it = _generated_monitors.find(sst->generation());
+ if (it != _generated_monitors.end()) {
+ it->second.remove_sstable(is_tracking);
+ }
}
}
private:
@@ -1039,9 +1041,7 @@ class regular_compaction : public compaction {
// an early sstable replacement.
//

- for (auto& sst : exhausted_sstables) {
- _monitor_generator.remove_sstable(_info->tracking, sst);
- }
+ _monitor_generator.remove_exhausted_sstables(_info->tracking, exhausted_sstables);

Commit Bot

unread,
Sep 29, 2021, 10:59:15 AMSep 29
to scylla...@googlegroups.com, Raphael S. Carvalho
From: Raphael S. Carvalho <raph...@scylladb.com>
Committer: Raphael S. Carvalho <raph...@scylladb.com>
Branch: master

compaction: Don't leak backlog of input sstable when compaction strategy is changed

The generic backlog formula is: ALL + PARTIAL - COMPACTING

With transfer_ongoing_charges() we already ignore the effect of
ongoing compactions on COMPACTING as we judge them to be pointless.

But ongoing compactions will run to completion, meaning that output
sstables will be added to ALL anyway, in the formula above.

With stop_tracking_ongoing_compactions(), input sstables are never
removed from the tracker, but output sstables are added, which means
we end up with duplicate backlog in the tracker.

By removing this tracking mechanism, pointless ongoing compaction
will be ignored as expected and the leaks will be fixed.

Later, the intention is to force a stop on ongoing compactions if
strategy has changed as they're pointless anyway.

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>

---
diff --git a/compaction/compaction.cc b/compaction/compaction.cc
--- a/compaction/compaction.cc
+++ b/compaction/compaction.cc
@@ -344,11 +344,9 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
return _last_position_seen;
}

- void remove_sstable(bool is_tracking) {
- if (is_tracking && _sst) {
+ void remove_sstable() {
+ if (_sst) {
_cf.get_compaction_strategy().get_backlog_tracker().remove_sstable(_sst);
- } else if (_sst) {
- _cf.get_compaction_strategy().get_backlog_tracker().revert_charges(_sst);
}
_sst = {};
}
@@ -375,17 +373,17 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
explicit compaction_read_monitor_generator(column_family& cf)
: _cf(cf) {}

- void remove_sstables(bool is_tracking) {
+ void remove_sstables() {
for (auto& rm : _generated_monitors | boost::adaptors::map_values) {
- rm.remove_sstable(is_tracking);
+ rm.remove_sstable();
}
}

- void remove_exhausted_sstables(bool is_tracking, const std::vector<sstables::shared_sstable>& exhausted_sstables) {
+ void remove_exhausted_sstables(const std::vector<sstables::shared_sstable>& exhausted_sstables) {
for (auto& sst : exhausted_sstables) {
auto it = _generated_monitors.find(sst->generation());
if (it != _generated_monitors.end()) {
- it->second.remove_sstable(is_tracking);
+ it->second.remove_sstable();
}
}
}
@@ -991,7 +989,7 @@ class regular_compaction : public compaction {
}

void backlog_tracker_adjust_charges() override {
- _monitor_generator.remove_sstables(_info->tracking);
+ _monitor_generator.remove_sstables();
auto& tracker = _cf.get_compaction_strategy().get_backlog_tracker();
for (auto& sst : _unused_sstables) {
tracker.add_sstable(sst);
@@ -1041,7 +1039,7 @@ class regular_compaction : public compaction {
// an early sstable replacement.
//

- _monitor_generator.remove_exhausted_sstables(_info->tracking, exhausted_sstables);
+ _monitor_generator.remove_exhausted_sstables(exhausted_sstables);
auto& tracker = _cf.get_compaction_strategy().get_backlog_tracker();
for (auto& sst : _unused_sstables) {
tracker.add_sstable(sst);
diff --git a/table.cc b/table.cc
--- a/table.cc
+++ b/table.cc

Commit Bot

unread,
Sep 29, 2021, 10:59:16 AMSep 29
to scylla...@googlegroups.com, Raphael S. Carvalho
From: Raphael S. Carvalho <raph...@scylladb.com>
Committer: Raphael S. Carvalho <raph...@scylladb.com>
Branch: master

compaction: Update backlog tracker correctly when schema is updated

Currently the following can happen:
1) there's ongoing compaction with input sstable A, so sstable set
and backlog tracker both contains A.
2) ongoing compaction replaces input sstable A by B, so sstable set
contains only B now.
3) schema is updated, so a new backlog tracker is built without A
because sstable set now contains only B.
4) ongoing compaction tries to remove A from tracker, but it was
excluded in step 3.
5) tracker can now have a negative value if table is decreasing in
size, which leads to log(<negative number>) == -NaN

This problem happens because backlog tracker updates are decoupled
from sstable set updates. Given that the essential content of
backlog tracker should be the same as one of sstable set, let's move
tracker management to table.
Whenever sstable set is updated, backlog tracker will be updated with
the same changes, making their management less error prone.

Fixes #9157

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>

---
diff --git a/compaction/compaction.cc b/compaction/compaction.cc
--- a/compaction/compaction.cc
+++ b/compaction/compaction.cc
@@ -346,7 +346,7 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {

void remove_sstable() {
if (_sst) {
- _cf.get_compaction_strategy().get_backlog_tracker().remove_sstable(_sst);
+ _cf.get_compaction_strategy().get_backlog_tracker().revert_charges(_sst);
}
_sst = {};
}
@@ -373,12 +373,6 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
explicit compaction_read_monitor_generator(column_family& cf)
: _cf(cf) {}

- void remove_sstables() {
- for (auto& rm : _generated_monitors | boost::adaptors::map_values) {
- rm.remove_sstable();
- }
- }
-
void remove_exhausted_sstables(const std::vector<sstables::shared_sstable>& exhausted_sstables) {
for (auto& sst : exhausted_sstables) {
auto it = _generated_monitors.find(sst->generation());
@@ -749,16 +743,13 @@ class compaction {
std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(), pretty_printed_throughput(_info->end_size, duration),
_info->total_partitions, _info->total_keys_written);

- backlog_tracker_adjust_charges();
-
auto info = std::move(_info);
_cf.get_compaction_manager().deregister_compaction(info);
return std::move(*info);
}

virtual std::string_view report_start_desc() const = 0;
virtual std::string_view report_finish_desc() const = 0;
- virtual void backlog_tracker_adjust_charges() { };

std::function<api::timestamp_type(const dht::decorated_key&)> max_purgeable_func() {
if (!tombstone_expiration_enabled()) {
@@ -960,7 +951,6 @@ class reshape_compaction : public compaction {
class regular_compaction : public compaction {
diff --git a/table.cc b/table.cc
--- a/table.cc
+++ b/table.cc
Reply all
Reply to author
Forward
0 new messages