[PATCH 0/7] Fix TWCS compaction aggressiveness due to data segregation

6 views
Skip to first unread message

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Jul 29, 2020, 2:40:32 PM7/29/20
to scylladb-dev@googlegroups.com, Raphael S. Carvalho
After data segregation feature, anything that cause out-of-order writes,
like read repair, can result in small updates to past time windows.
This causes compaction to be very aggressive because whenever a past time
window is updated like that, that time window is recompacted into a
single SSTable.
Users expect that once a window is closed, it will no longer be written
to, but that has changed since the introduction of the data segregation
future. We didn't anticipate the write amplification issues that the
feature would cause. To fix this problem, let's perform size-tiered
compaction on the windows that are no longer active and were updated
because data was segregated. The current behavior where the last active
window is merged into one file is kept. But thereafter, that same
window will only be compacted using STCS.

Fixes #6928.

Also at: g...@github.com:raphaelsc/scylla.git fix_twcs_agressiveness_after_data_segregation

tests:
- unit: mode(dev)
- manually verified the correctness of the new behavior with debug logs

Raphael S. Carvalho (7):
compaction/twcs: Make newest_bucket() non-static
compact/twcs: Perform size-tiered compaction on past time windows
compaction/stcs: Make get_buckets() static
compaction/stcs: Export static function that estimates pending tasks
compaction/twcs: Make task estimation take into account the
size-tiered behavior
test: Check that TWCS properly performs size-tiered compaction on past
windows
compaction/twcs: Improve debug message

sstables/size_tiered_compaction_strategy.hh | 6 +-
sstables/time_window_compaction_strategy.hh | 60 ++++++++++-----
sstables/size_tiered_compaction_strategy.cc | 33 ++++----
test/boost/sstable_datafile_test.cc | 83 ++++++++++++++++++++-
4 files changed, 146 insertions(+), 36 deletions(-)

--
2.26.2

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Jul 29, 2020, 2:40:34 PM7/29/20
to scylladb-dev@googlegroups.com, Raphael S. Carvalho
To fix #6928, newest_bucket() will have to access the class fields.

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>
---
sstables/time_window_compaction_strategy.hh | 2 +-
test/boost/sstable_datafile_test.cc | 8 +++++---
2 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/sstables/time_window_compaction_strategy.hh b/sstables/time_window_compaction_strategy.hh
index 31d285221..821177cca 100644
--- a/sstables/time_window_compaction_strategy.hh
+++ b/sstables/time_window_compaction_strategy.hh
@@ -260,7 +260,7 @@ class time_window_compaction_strategy : public compaction_strategy_impl {
return std::make_pair(std::move(buckets), max_timestamp);
}

- static std::vector<shared_sstable>
+ std::vector<shared_sstable>
newest_bucket(std::map<timestamp_type, std::vector<shared_sstable>> buckets, int min_threshold, int max_threshold,
std::chrono::seconds sstable_window_size, timestamp_type now, size_tiered_compaction_strategy_options& stcs_options) {
// If the current bucket has at least minThreshold SSTables, choose that one.
diff --git a/test/boost/sstable_datafile_test.cc b/test/boost/sstable_datafile_test.cc
index 9879b1b4a..e4b751c56 100644
--- a/test/boost/sstable_datafile_test.cc
+++ b/test/boost/sstable_datafile_test.cc
@@ -3168,6 +3168,8 @@ SEASTAR_TEST_CASE(time_window_strategy_correctness_test) {
sstables.push_back(make_sstable_containing(sst_gen, {std::move(mut)}));
}

+ std::map<sstring, sstring> options;
+ time_window_compaction_strategy twcs(options);
std::map<api::timestamp_type, std::vector<shared_sstable>> buckets;

// We'll put 3 sstables into the newest bucket
@@ -3177,13 +3179,13 @@ SEASTAR_TEST_CASE(time_window_strategy_correctness_test) {
}
sstables::size_tiered_compaction_strategy_options stcs_options;
auto now = api::timestamp_clock::now().time_since_epoch().count();
- auto new_bucket = time_window_compaction_strategy::newest_bucket(buckets, 4, 32, duration_cast<seconds>(hours(1)),
+ auto new_bucket = twcs.newest_bucket(buckets, 4, 32, duration_cast<seconds>(hours(1)),
time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(hours(1)), now), stcs_options);
// incoming bucket should not be accepted when it has below the min threshold SSTables
BOOST_REQUIRE(new_bucket.empty());

now = api::timestamp_clock::now().time_since_epoch().count();
- new_bucket = time_window_compaction_strategy::newest_bucket(buckets, 2, 32, duration_cast<seconds>(hours(1)),
+ new_bucket = twcs.newest_bucket(buckets, 2, 32, duration_cast<seconds>(hours(1)),
time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(hours(1)), now), stcs_options);
// incoming bucket should be accepted when it is larger than the min threshold SSTables
BOOST_REQUIRE(!new_bucket.empty());
@@ -3218,7 +3220,7 @@ SEASTAR_TEST_CASE(time_window_strategy_correctness_test) {
}

now = api::timestamp_clock::now().time_since_epoch().count();
- new_bucket = time_window_compaction_strategy::newest_bucket(buckets, 4, 32, duration_cast<seconds>(hours(1)),
+ new_bucket = twcs.newest_bucket(buckets, 4, 32, duration_cast<seconds>(hours(1)),
time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(hours(1)), now), stcs_options);
// new bucket should be trimmed to max threshold of 32
BOOST_REQUIRE(new_bucket.size() == size_t(32));
--
2.26.2

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Jul 29, 2020, 2:40:37 PM7/29/20
to scylladb-dev@googlegroups.com, Raphael S. Carvalho
After data segregation feature, anything that cause out-of-order writes,
like read repair, can result in small updates to past time windows.
This causes compaction to be very aggressive because whenever a past time
window is updated like that, that time window is recompacted into a
single SSTable.
Users expect that once a window is closed, it will no longer be written
to, but that has changed since the introduction of the data segregation
future. We didn't anticipate the write amplification issues that the
feature would cause. To fix this problem, let's perform size-tiered
compaction on the windows that are no longer active and were updated
because data was segregated. The current behavior where the last active
window is merged into one file is kept. But thereafter, that same
window will only be compacted using STCS.

Fixes #6928.

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>
---
sstables/time_window_compaction_strategy.hh | 42 +++++++++++++++------
1 file changed, 31 insertions(+), 11 deletions(-)

diff --git a/sstables/time_window_compaction_strategy.hh b/sstables/time_window_compaction_strategy.hh
index 821177cca..73ab97181 100644
--- a/sstables/time_window_compaction_strategy.hh
+++ b/sstables/time_window_compaction_strategy.hh
@@ -141,6 +141,8 @@ class time_window_compaction_strategy : public compaction_strategy_impl {
int64_t _estimated_remaining_tasks = 0;
db_clock::time_point _last_expired_check;
timestamp_type _highest_window_seen;
+ // Keep track of all recent active windows that still need to be compacted into a single SSTable
+ std::unordered_set<timestamp_type> _recent_active_windows;
size_tiered_compaction_strategy_options _stcs_options;
compaction_backlog_tracker _backlog_tracker;
public:
@@ -273,20 +275,38 @@ class time_window_compaction_strategy : public compaction_strategy_impl {

clogger.trace("Key {}, now {}", key, now);

- if (bucket.size() >= size_t(min_threshold) && key >= now) {
- // If we're in the newest bucket, we'll use STCS to prioritize sstables
- auto stcs_interesting_bucket = size_tiered_compaction_strategy::most_interesting_bucket(bucket, min_threshold, max_threshold, stcs_options);
-
- // If the tables in the current bucket aren't eligible in the STCS strategy, we'll skip it and look for other buckets
- if (!stcs_interesting_bucket.empty()) {
- return stcs_interesting_bucket;
+ if (key >= now) {
+ _recent_active_windows.insert(key);
+ if (bucket.size() >= size_t(min_threshold)) {
+ // If we're in the newest bucket, we'll use STCS to prioritize sstables
+ auto stcs_interesting_bucket = size_tiered_compaction_strategy::most_interesting_bucket(bucket,
+ min_threshold, max_threshold, stcs_options);
+
+ // If the tables in the current bucket aren't eligible in the STCS strategy, we'll skip it and look for other buckets
+ if (!stcs_interesting_bucket.empty()) {
+ return stcs_interesting_bucket;
+ }
}
- } else if (bucket.size() >= 2 && key < now) {
- clogger.debug("bucket size {} >= 2 and not in current bucket, compacting what's here", bucket.size());
- return trim_to_threshold(std::move(bucket), max_threshold);
} else {
- clogger.debug("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now);
+ // Let's perform STCS on all past windows but the ones that were not closed yet, to prevent the write
+ // ampĺification from being hurt when read repair, for example, cause small updates to past windows.
+ if (_recent_active_windows.count(key)) {
+ _recent_active_windows.erase(key);
+ if (bucket.size() >= 2) {
+ clogger.debug("bucket size {} >= 2 for the recent active window {}, compacting what's here", bucket.size(), key);
+ return trim_to_threshold(std::move(bucket), max_threshold);
+ }
+ } else {
+ auto stcs_interesting_bucket = size_tiered_compaction_strategy::most_interesting_bucket(bucket,
+ min_threshold, max_threshold, stcs_options);
+
+ if (!stcs_interesting_bucket.empty()) {
+ clogger.debug("bucket size {} >= 2 for the past window {}, compacting what's here with STCS", bucket.size(), key);
+ return stcs_interesting_bucket;
+ }
+ }
}
+ clogger.debug("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now);
}
return {};
}
--
2.26.2

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Jul 29, 2020, 2:40:40 PM7/29/20
to scylladb-dev@googlegroups.com, Raphael S. Carvalho
STCS will export a static function to estimate pending tasks, and
it relies on get_buckets() being static too.

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>
---
sstables/size_tiered_compaction_strategy.hh | 4 ++--
sstables/size_tiered_compaction_strategy.cc | 16 ++++++++--------
2 files changed, 10 insertions(+), 10 deletions(-)

diff --git a/sstables/size_tiered_compaction_strategy.hh b/sstables/size_tiered_compaction_strategy.hh
index 5ca079e75..be75ac22d 100644
--- a/sstables/size_tiered_compaction_strategy.hh
+++ b/sstables/size_tiered_compaction_strategy.hh
@@ -116,10 +116,10 @@ class size_tiered_compaction_strategy : public compaction_strategy_impl {
compaction_backlog_tracker _backlog_tracker;

// Return a list of pair of shared_sstable and its respective size.
- std::vector<std::pair<sstables::shared_sstable, uint64_t>> create_sstable_and_length_pairs(const std::vector<sstables::shared_sstable>& sstables) const;
+ static std::vector<std::pair<sstables::shared_sstable, uint64_t>> create_sstable_and_length_pairs(const std::vector<sstables::shared_sstable>& sstables);

// Group files of similar size into buckets.
- std::vector<std::vector<sstables::shared_sstable>> get_buckets(const std::vector<sstables::shared_sstable>& sstables) const;
+ static std::vector<std::vector<sstables::shared_sstable>> get_buckets(const std::vector<sstables::shared_sstable>& sstables, size_tiered_compaction_strategy_options options);

// Maybe return a bucket of sstables to compact
std::vector<sstables::shared_sstable>
diff --git a/sstables/size_tiered_compaction_strategy.cc b/sstables/size_tiered_compaction_strategy.cc
index 190b9d2f3..36522cce1 100644
--- a/sstables/size_tiered_compaction_strategy.cc
+++ b/sstables/size_tiered_compaction_strategy.cc
@@ -27,7 +27,7 @@
namespace sstables {

std::vector<std::pair<sstables::shared_sstable, uint64_t>>
-size_tiered_compaction_strategy::create_sstable_and_length_pairs(const std::vector<sstables::shared_sstable>& sstables) const {
+size_tiered_compaction_strategy::create_sstable_and_length_pairs(const std::vector<sstables::shared_sstable>& sstables) {

std::vector<std::pair<sstables::shared_sstable, uint64_t>> sstable_length_pairs;
sstable_length_pairs.reserve(sstables.size());
@@ -43,7 +43,7 @@ size_tiered_compaction_strategy::create_sstable_and_length_pairs(const std::vect
}

std::vector<std::vector<sstables::shared_sstable>>
-size_tiered_compaction_strategy::get_buckets(const std::vector<sstables::shared_sstable>& sstables) const {
+size_tiered_compaction_strategy::get_buckets(const std::vector<sstables::shared_sstable>& sstables, size_tiered_compaction_strategy_options options) {
// sstables sorted by size of its data file.
auto sorted_sstables = create_sstable_and_length_pairs(sstables);

@@ -64,8 +64,8 @@ size_tiered_compaction_strategy::get_buckets(const std::vector<sstables::shared_
for (auto it = buckets.begin(); it != buckets.end(); it++) {
size_t old_average_size = it->first;

- if ((size > (old_average_size * _options.bucket_low) && size < (old_average_size * _options.bucket_high)) ||
- (size < _options.min_sstable_size && old_average_size < _options.min_sstable_size)) {
+ if ((size > (old_average_size * options.bucket_low) && size < (old_average_size * options.bucket_high)) ||
+ (size < options.min_sstable_size && old_average_size < options.min_sstable_size)) {
auto bucket = std::move(it->second);
size_t total_size = bucket.size() * old_average_size;
size_t new_average_size = (total_size + size) / (bucket.size() + 1);
@@ -141,7 +141,7 @@ size_tiered_compaction_strategy::get_sstables_for_compaction(column_family& cfs,

// TODO: Add support to filter cold sstables (for reference: SizeTieredCompactionStrategy::filterColdSSTables).

- auto buckets = get_buckets(candidates);
+ auto buckets = get_buckets(candidates, _options);

if (is_any_bucket_interesting(buckets, min_threshold)) {
std::vector<sstables::shared_sstable> most_interesting = most_interesting_bucket(std::move(buckets), min_threshold, max_threshold);
@@ -187,7 +187,7 @@ int64_t size_tiered_compaction_strategy::estimated_pending_compactions(column_fa
sstables.push_back(entry);
}

- for (auto& bucket : get_buckets(sstables)) {
+ for (auto& bucket : get_buckets(sstables, _options)) {
if (bucket.size() >= size_t(min_threshold)) {
n += std::ceil(double(bucket.size()) / max_threshold);
}
@@ -200,7 +200,7 @@ size_tiered_compaction_strategy::most_interesting_bucket(const std::vector<sstab
int min_threshold, int max_threshold, size_tiered_compaction_strategy_options options) {
size_tiered_compaction_strategy cs(options);

- auto buckets = cs.get_buckets(candidates);
+ auto buckets = cs.get_buckets(candidates, options);

std::vector<sstables::shared_sstable> most_interesting = cs.most_interesting_bucket(std::move(buckets),
min_threshold, max_threshold);
@@ -218,7 +218,7 @@ size_tiered_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> i
offstrategy_threshold = max_sstables;
}

- for (auto& bucket : get_buckets(input)) {
+ for (auto& bucket : get_buckets(input, _options)) {
if (bucket.size() >= offstrategy_threshold) {
bucket.resize(std::min(max_sstables, bucket.size()));
compaction_descriptor desc(std::move(bucket), std::optional<sstables::sstable_set>(), iop);
--
2.26.2

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Jul 29, 2020, 2:40:41 PM7/29/20
to scylladb-dev@googlegroups.com, Raphael S. Carvalho
That will be useful for allowing other compaction strategies that use
STCS to properly estimate the pending tasks.

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>
---
sstables/size_tiered_compaction_strategy.hh | 2 ++
sstables/size_tiered_compaction_strategy.cc | 19 ++++++++++++-------
2 files changed, 14 insertions(+), 7 deletions(-)

diff --git a/sstables/size_tiered_compaction_strategy.hh b/sstables/size_tiered_compaction_strategy.hh
index be75ac22d..4f40a3803 100644
--- a/sstables/size_tiered_compaction_strategy.hh
+++ b/sstables/size_tiered_compaction_strategy.hh
@@ -154,6 +154,8 @@ class size_tiered_compaction_strategy : public compaction_strategy_impl {

virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) override;

+ static int64_t estimated_pending_compactions(const std::vector<sstables::shared_sstable>& sstables,
+ int min_threshold, int max_threshold, size_tiered_compaction_strategy_options options);
virtual int64_t estimated_pending_compactions(column_family& cf) const override;

virtual compaction_strategy_type type() const {
diff --git a/sstables/size_tiered_compaction_strategy.cc b/sstables/size_tiered_compaction_strategy.cc
index 36522cce1..d1f430b6d 100644
--- a/sstables/size_tiered_compaction_strategy.cc
+++ b/sstables/size_tiered_compaction_strategy.cc
@@ -176,23 +176,28 @@ size_tiered_compaction_strategy::get_sstables_for_compaction(column_family& cfs,
return sstables::compaction_descriptor();
}

+int64_t size_tiered_compaction_strategy::estimated_pending_compactions(const std::vector<sstables::shared_sstable>& sstables,
+ int min_threshold, int max_threshold, size_tiered_compaction_strategy_options options) {
+ int64_t n = 0;
+ for (auto& bucket : get_buckets(sstables, options)) {
+ if (bucket.size() >= size_t(min_threshold)) {
+ n += std::ceil(double(bucket.size()) / max_threshold);
+ }
+ }
+ return n;
+}
+
int64_t size_tiered_compaction_strategy::estimated_pending_compactions(column_family& cf) const {
int min_threshold = cf.min_compaction_threshold();
int max_threshold = cf.schema()->max_compaction_threshold();
std::vector<sstables::shared_sstable> sstables;
- int64_t n = 0;

sstables.reserve(cf.sstables_count());
for (auto& entry : *cf.get_sstables()) {
sstables.push_back(entry);
}

- for (auto& bucket : get_buckets(sstables, _options)) {
- if (bucket.size() >= size_t(min_threshold)) {
- n += std::ceil(double(bucket.size()) / max_threshold);
- }
- }
- return n;
+ return estimated_pending_compactions(sstables, min_threshold, max_threshold, _options);
}

std::vector<sstables::shared_sstable>
--
2.26.2

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Jul 29, 2020, 2:40:43 PM7/29/20
to scylladb-dev@googlegroups.com, Raphael S. Carvalho
The task estimation was not taking into account that TWCS does size-tiered
on the current window, and it only added 1 to the estimation when there
could be more tasks than that depending on the amount of SSTables in
all the size tiers. Let's also adjust task estimation to take into
account the new behavior where past windows can now also be compacted
with size-tiered.

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>
---
sstables/time_window_compaction_strategy.hh | 16 +++++++++++-----
1 file changed, 11 insertions(+), 5 deletions(-)

diff --git a/sstables/time_window_compaction_strategy.hh b/sstables/time_window_compaction_strategy.hh
index 73ab97181..3ea20a8ca 100644
--- a/sstables/time_window_compaction_strategy.hh
+++ b/sstables/time_window_compaction_strategy.hh
@@ -223,7 +223,7 @@ class time_window_compaction_strategy : public compaction_strategy_impl {
// Update the highest window seen, if necessary
_highest_window_seen = std::max(_highest_window_seen, p.second);

- update_estimated_compaction_by_tasks(p.first, cf.min_compaction_threshold());
+ update_estimated_compaction_by_tasks(p.first, cf.min_compaction_threshold(), cf.schema()->max_compaction_threshold());

return newest_bucket(std::move(p.first), cf.min_compaction_threshold(), cf.schema()->max_compaction_threshold(),
_options.sstable_window_size, _highest_window_seen, _stcs_options);
@@ -332,7 +332,7 @@ class time_window_compaction_strategy : public compaction_strategy_impl {
return timestamp_type(std::chrono::duration_cast<std::chrono::microseconds>(options.get_sstable_window_size()).count());
}
private:
- void update_estimated_compaction_by_tasks(std::map<timestamp_type, std::vector<shared_sstable>>& tasks, int min_threshold) {
+ void update_estimated_compaction_by_tasks(std::map<timestamp_type, std::vector<shared_sstable>>& tasks, int min_threshold, int max_threshold) {
int64_t n = 0;
timestamp_type now = _highest_window_seen;

@@ -342,9 +342,15 @@ class time_window_compaction_strategy : public compaction_strategy_impl {
// For current window, make sure it's compactable
auto count = task.second.size();
if (key >= now && count >= size_t(min_threshold)) {
- n++;
- } else if (key < now && count >= 2) {
- n++;
+ n += size_tiered_compaction_strategy::estimated_pending_compactions(task.second,
+ min_threshold, max_threshold, _stcs_options);
+ } else if (key < now) {
+ if (_recent_active_windows.count(key) && count >= 2) {
+ n++;
+ } else if (count >= size_t(min_threshold)) {
+ n += size_tiered_compaction_strategy::estimated_pending_compactions(task.second,
+ min_threshold, max_threshold, _stcs_options);
+ }
}
}
_estimated_remaining_tasks = n;
--
2.26.2

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Jul 29, 2020, 2:40:45 PM7/29/20
to scylladb-dev@googlegroups.com, Raphael S. Carvalho
Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>
---
test/boost/sstable_datafile_test.cc | 75 +++++++++++++++++++++++++++++
1 file changed, 75 insertions(+)

diff --git a/test/boost/sstable_datafile_test.cc b/test/boost/sstable_datafile_test.cc
index e4b751c56..07266710f 100644
--- a/test/boost/sstable_datafile_test.cc
+++ b/test/boost/sstable_datafile_test.cc
@@ -3227,6 +3227,81 @@ SEASTAR_TEST_CASE(time_window_strategy_correctness_test) {
});
}

+// Check that TWCS will only perform size-tiered on the current window and also
+// the past windows that were already previously compacted into a single SSTable.
+SEASTAR_TEST_CASE(time_window_strategy_size_tiered_behavior_correctness) {
+ using namespace std::chrono;
+
+ return test_env::do_with_async([] (test_env& env) {
+ storage_service_for_tests ssft;
+ auto s = schema_builder("tests", "time_window_strategy")
+ .with_column("id", utf8_type, column_kind::partition_key)
+ .with_column("value", int32_type).build();
+
+ auto tmp = tmpdir();
+ auto sst_gen = [&env, s, &tmp, gen = make_lw_shared<unsigned>(1)] () mutable {
+ return env.make_sstable(s, tmp.path().string(), (*gen)++, la, big);
+ };
+
+ auto make_insert = [&] (partition_key key, api::timestamp_type t) {
+ mutation m(s, key);
+ m.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), t);
+ return m;
+ };
+
+ std::map<sstring, sstring> options;
+ sstables::size_tiered_compaction_strategy_options stcs_options;
+ time_window_compaction_strategy twcs(options);
+ std::map<api::timestamp_type, std::vector<shared_sstable>> buckets; // windows
+ int min_threshold = 4;
+ int max_threshold = 32;
+ auto window_size = duration_cast<seconds>(hours(1));
+
+ auto add_new_sstable_to_bucket = [&] (api::timestamp_type ts, api::timestamp_type window_ts) {
+ auto key = partition_key::from_exploded(*s, {to_bytes("key" + to_sstring(ts))});
+ auto mut = make_insert(std::move(key), ts);
+ auto sst = make_sstable_containing(sst_gen, {std::move(mut)});
+ auto bound = time_window_compaction_strategy::get_window_lower_bound(window_size, window_ts);
+ buckets[bound].push_back(std::move(sst));
+ };
+
+ api::timestamp_type current_window_ts = api::timestamp_clock::now().time_since_epoch().count();
+ api::timestamp_type past_window_ts = current_window_ts - duration_cast<microseconds>(seconds(2L * 3600L)).count();
+
+ // create 1 sstable into past time window and let the strategy know about it
+ add_new_sstable_to_bucket(0, past_window_ts);
+
+ auto now = time_window_compaction_strategy::get_window_lower_bound(window_size, past_window_ts);
+
+ // past window cannot be compacted because it has a single SSTable
+ BOOST_REQUIRE(twcs.newest_bucket(buckets, min_threshold, max_threshold, window_size, now, stcs_options).size() == 0);
+
+ // create min_threshold-1 sstables into current time window
+ for (api::timestamp_type t = 0; t < min_threshold - 1; t++) {
+ add_new_sstable_to_bucket(t, current_window_ts);
+ }
+ // add 1 sstable into past window.
+ add_new_sstable_to_bucket(1, past_window_ts);
+
+ now = time_window_compaction_strategy::get_window_lower_bound(window_size, current_window_ts);
+
+ // past window can now be compacted into a single SSTable because it was the previous current (active) window.
+ // current window cannot be compacted because it has less than min_threshold SSTables
+ BOOST_REQUIRE(twcs.newest_bucket(buckets, min_threshold, max_threshold, window_size, now, stcs_options).size() == 2);
+
+ // now past window cannot be compacted again, because it was already compacted into a single SSTable, now it switches to STCS mode.
+ BOOST_REQUIRE(twcs.newest_bucket(buckets, min_threshold, max_threshold, window_size, now, stcs_options).size() == 0);
+
+ // make past window contain more than min_threshold similar-sized SSTables, allowing it to be compacted again.
+ for (api::timestamp_type t = 2; t < min_threshold; t++) {
+ add_new_sstable_to_bucket(t, past_window_ts);
+ }
+
+ // now past window can be compacted again because it switched to STCS mode and has more than min_threshold SSTables.
+ BOOST_REQUIRE(twcs.newest_bucket(buckets, min_threshold, max_threshold, window_size, now, stcs_options).size() == size_t(min_threshold));
+ });
+}
+
SEASTAR_TEST_CASE(test_promoted_index_read) {
// create table promoted_index_read (
// pk int,
--
2.26.2

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Jul 29, 2020, 2:40:47 PM7/29/20
to scylladb-dev@googlegroups.com, Raphael S. Carvalho
It's helpful to know the amount of SSTables in each window too.

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>
---
sstables/time_window_compaction_strategy.hh | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/sstables/time_window_compaction_strategy.hh b/sstables/time_window_compaction_strategy.hh
index 3ea20a8ca..b261e27bb 100644
--- a/sstables/time_window_compaction_strategy.hh
+++ b/sstables/time_window_compaction_strategy.hh
@@ -273,7 +273,7 @@ class time_window_compaction_strategy : public compaction_strategy_impl {
auto key = key_bucket.first;
auto& bucket = key_bucket.second;

- clogger.trace("Key {}, now {}", key, now);
+ clogger.trace("Key {}, now {}, size {}", key, now, bucket.size());

if (key >= now) {
_recent_active_windows.insert(key);
--
2.26.2

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Jul 30, 2020, 4:05:12 PM7/30/20
to scylladb-dev, Maintainers
ping

Benny Halevy

<bhalevy@scylladb.com>
unread,
Aug 2, 2020, 10:54:43 AM8/2/20
to Raphael S. Carvalho, scylladb-dev@googlegroups.com
erase returns the number of elements removed so you don't
need the call to count surrounding it,
if (_recent_active_windows.erase(key)) {...
would do.

> + if (bucket.size() >= 2) {
> + clogger.debug("bucket size {} >= 2 for the recent active window {}, compacting what's here", bucket.size(), key);
> + return trim_to_threshold(std::move(bucket), max_threshold);
> + }
> + } else {
> + auto stcs_interesting_bucket = size_tiered_compaction_strategy::most_interesting_bucket(bucket,
> + min_threshold, max_threshold, stcs_options);
> +
> + if (!stcs_interesting_bucket.empty()) {
> + clogger.debug("bucket size {} >= 2 for the past window {}, compacting what's here with STCS", bucket.size(), key);
> + return stcs_interesting_bucket;
> + }
> + }

Why is this method in the header file?
It's a good opportunity to move it out to the .cc file.
Also, can the repeated code code for stcs be refactored into a function please?

Benny Halevy

<bhalevy@scylladb.com>
unread,
Aug 2, 2020, 11:00:00 AM8/2/20
to Raphael S. Carvalho, scylladb-dev@googlegroups.com
On Wed, 2020-07-29 at 15:39 -0300, Raphael S. Carvalho wrote:
To reduce the code churn, how about renaming it
to e.g. get_buckets_by_options and have the get_buckets
(non static) method call it with `_options`?

Benny Halevy

<bhalevy@scylladb.com>
unread,
Aug 2, 2020, 11:16:19 AM8/2/20
to Raphael S. Carvalho, scylladb-dev@googlegroups.com
On Wed, 2020-07-29 at 15:39 -0300, Raphael S. Carvalho wrote:
This code being correct depends on newest_bucket's logic, right?
It's impossible to keep them in sync this way.

Can we have it return the respective estimated_pending_compactions
when it chooses stcs, and 1 in the "compact to single sstable" path
and pass the resulting number to this method?

Benny Halevy

<bhalevy@scylladb.com>
unread,
Aug 2, 2020, 11:20:46 AM8/2/20
to Raphael S. Carvalho, scylladb-dev@googlegroups.com
On Wed, 2020-07-29 at 15:39 -0300, Raphael S. Carvalho wrote:
There's no context to this message so one has to have the code in front of oneself
to decipher the information. Hoe about being a bit more verbose, like:

clogger.trace("newest_bucket: now={}: bucket.key={} bucket.size={}", now, key, bucket.size());1

Avi Kivity

<avi@scylladb.com>
unread,
Aug 2, 2020, 11:41:52 AM8/2/20
to Raphael S. Carvalho, scylladb-dev@googlegroups.com

On 29/07/2020 21.39, Raphael S. Carvalho wrote:
> After data segregation feature, anything that cause out-of-order writes,
> like read repair, can result in small updates to past time windows.
> This causes compaction to be very aggressive because whenever a past time
> window is updated like that, that time window is recompacted into a
> single SSTable.
> Users expect that once a window is closed, it will no longer be written
> to, but that has changed since the introduction of the data segregation
> future. We didn't anticipate the write amplification issues that the
> feature would cause. To fix this problem, let's perform size-tiered
> compaction on the windows that are no longer active and were updated
> because data was segregated. The current behavior where the last active
> window is merged into one file is kept. But thereafter, that same
> window will only be compacted using STCS.
>

I think we can also do a major compaction on old buckets, in order to
reduce read amplification. Consider this algorithm:


if (now - max(disk_timestamp(sstable) for sstable in bucket) >= 0.75 *
(now - bucket_end_timestamp))

   do major compaction


Where disk_timestamp refers to the on-disk file timestamp, not mutation
timestamps.


So if we have week-sized windows, we'll compact window 4 if we have a
three week old sstable. We'll compact window 12 if we have an 8 week
sstable. This ensures that we only compact rarely, but we do compact
from time to time so that reads only access a single sstable. This
automatically schedules a major compaction for recent windows.


I don't propose doing this now.


> Fixes #6928.
>
> Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>
> ---
> sstables/time_window_compaction_strategy.hh | 42 +++++++++++++++------
> 1 file changed, 31 insertions(+), 11 deletions(-)
>
> diff --git a/sstables/time_window_compaction_strategy.hh b/sstables/time_window_compaction_strategy.hh
> index 821177cca..73ab97181 100644
> --- a/sstables/time_window_compaction_strategy.hh
> +++ b/sstables/time_window_compaction_strategy.hh
> @@ -141,6 +141,8 @@ class time_window_compaction_strategy : public compaction_strategy_impl {
> int64_t _estimated_remaining_tasks = 0;
> db_clock::time_point _last_expired_check;
> timestamp_type _highest_window_seen;
> + // Keep track of all recent active windows that still need to be compacted into a single SSTable
> + std::unordered_set<timestamp_type> _recent_active_windows;


Note that this isn't persisted, so a restart breaks the logic. I don't
think we should persist it, instead we should use the on-disk metadata
like I proposed above. It's good enough for now.

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Aug 3, 2020, 8:41:47 AM8/3/20
to Benny Halevy, scylladb-dev
Makes sense, it will be even nicer to rely on function overloading
like this: https://godbolt.org/z/31EW5T

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Aug 3, 2020, 8:53:16 AM8/3/20
to Benny Halevy, scylladb-dev
Thanks.

This message is printed once for each window bucket, in sequence. I
use it constantly for observing the TWCS behavior, and I'd rather see
the window key first than now. Also the more characters the harder it
becomes to debug this.

I am thinking about coalescing all this within a single log msg, like this:
newest_bucket:
now={}
buckets={
key={}, size={},
...
}

I will probably overload the operator << for window buckets when
implementing this, making room for future enhancements. If I make this
a single log msg, I am even considering making it debug, rather than
trace as this msg will be very useful when debugging TWCS.

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Aug 3, 2020, 9:05:10 AM8/3/20
to Benny Halevy, scylladb-dev
Correct.

> It's impossible to keep them in sync this way.

Not impossible, but somewhat hard :-) We don't expect newest_bucket()
to change frequently, so it's not something we'll worry about
frequently.

>
> Can we have it return the respective estimated_pending_compactions
> when it chooses stcs, and 1 in the "compact to single sstable" path
> and pass the resulting number to this method?

estimated_pending_compactions() has to calculate all the pending tasks
on all the existing window buckets. newest_bucket() stops iterating
through the windows as soon as it finds a good target. What I can do
is to introduce a patch that will reuse the knowledge of whether a
time window requires STCS or "major compaction" by introducing a
helper function, so avoiding this duplication you dislike.

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Aug 3, 2020, 9:12:34 AM8/3/20
to Benny Halevy, scylladb-dev
Thanks! So we'll avoid having to lookup the key twice. I think I
alternatively could do the following to make the code more intuitive:

if ((auto it = _recent.find(key)) != _recent.end()) {
_recent.erase(it);
...




>
> > + if (bucket.size() >= 2) {
> > + clogger.debug("bucket size {} >= 2 for the recent active window {}, compacting what's here", bucket.size(), key);
> > + return trim_to_threshold(std::move(bucket), max_threshold);
> > + }
> > + } else {
> > + auto stcs_interesting_bucket = size_tiered_compaction_strategy::most_interesting_bucket(bucket,
> > + min_threshold, max_threshold, stcs_options);
> > +
> > + if (!stcs_interesting_bucket.empty()) {
> > + clogger.debug("bucket size {} >= 2 for the past window {}, compacting what's here with STCS", bucket.size(), key);
> > + return stcs_interesting_bucket;
> > + }
> > + }
>
> Why is this method in the header file?
> It's a good opportunity to move it out to the .cc file.

Alright.

> Also, can the repeated code code for stcs be refactored into a function please?

Ok.

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Aug 3, 2020, 9:46:06 AM8/3/20
to Avi Kivity, scylladb-dev
Cool!

I think the 0.75 factor should probably be made a parameter. Not
having a parameter that allows users to control a new behavior turned
out to be a problem.

For example, let's say that read repair is constantly pushing small
updates to all windows.

If we have windows from 1 to 100, it could happen that many windows
will have major compaction scheduled at roughly the same time.

Example, window 2, 4 , 8 and 12, could be scheduled in 1.5, 3, 6, and
9 weeks, respectively. Eventually their major compaction schedule
could overlap in time. That may impact some users, so we probably
should have an on/off switch for this automatic major compaction.


>
>
> I don't propose doing this now.
>
>
> > Fixes #6928.
> >
> > Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>
> > ---
> > sstables/time_window_compaction_strategy.hh | 42 +++++++++++++++------
> > 1 file changed, 31 insertions(+), 11 deletions(-)
> >
> > diff --git a/sstables/time_window_compaction_strategy.hh b/sstables/time_window_compaction_strategy.hh
> > index 821177cca..73ab97181 100644
> > --- a/sstables/time_window_compaction_strategy.hh
> > +++ b/sstables/time_window_compaction_strategy.hh
> > @@ -141,6 +141,8 @@ class time_window_compaction_strategy : public compaction_strategy_impl {
> > int64_t _estimated_remaining_tasks = 0;
> > db_clock::time_point _last_expired_check;
> > timestamp_type _highest_window_seen;
> > + // Keep track of all recent active windows that still need to be compacted into a single SSTable
> > + std::unordered_set<timestamp_type> _recent_active_windows;
>
>
> Note that this isn't persisted, so a restart breaks the logic.

Indeed.

I don't
> think we should persist it, instead we should use the on-disk metadata
> like I proposed above. It's good enough for now.

If Scylla crashes, we'd still be able to major compact the current
window once it's closed, but indeed the past window which wasn't major
compacted yet would only be compacted with STCS. I think it's not hard
to rebuild _recent_active_windows across restart, by looking only at
the past 2 windows, but this is rare enough that it's perhaps not
worth optimizing for.

Avi Kivity

<avi@scylladb.com>
unread,
Aug 3, 2020, 10:12:01 AM8/3/20
to Raphael S. Carvalho, scylladb-dev
Well, we still have the compaction weight lock.

Benny Halevy

<bhalevy@scylladb.com>
unread,
Aug 4, 2020, 2:03:53 AM8/4/20
to Raphael S. Carvalho, scylladb-dev
That sounds better.
Where are you going to keep the state
to make sure they reach the same result? (e.g. timestamps, recent windows, etc.)

Benny Halevy

<bhalevy@scylladb.com>
unread,
Aug 4, 2020, 5:23:52 AM8/4/20
to Raphael S. Carvalho, scylladb-dev
That's possible too.
The scope of auto it looks strange to me.
Usually it's evaluated before the `if` statement.
But maybe it's an acquired taste... :)

Avi Kivity

<avi@scylladb.com>
unread,
Aug 4, 2020, 5:56:18 AM8/4/20
to Benny Halevy, Raphael S. Carvalho, scylladb-dev
I think that's illegal. Correct syntax is


    if (auto it = _recent.find(key); it != _recent.end()) {


This initializing-in-if syntax if helpful when we want to destroy the
created variable quickly, rather than let is survive the entire function.

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Aug 4, 2020, 6:41:17 AM8/4/20
to Benny Halevy, scylladb-dev
get_compaction_candidates() calls
update_estimated_compaction_by_tasks() and newest_buckets() with the
same set of window buckets. So if they reuse the knowledge of whether
a window needs STCS or major compaction, then they will reach the same
result. With this, I can even simplify the logic of newest_bucket(),
and of course of update_estimated_compaction_by_tasks() too.

Benny Halevy

<bhalevy@scylladb.com>
unread,
Aug 4, 2020, 9:39:22 AM8/4/20
to Avi Kivity, Raphael S. Carvalho, scylladb-dev
yes, starting c++17 apparently (looks like a for loop without the "increment" part)

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Aug 4, 2020, 10:03:07 AM8/4/20
to Avi Kivity, scylladb-dev
Indeed, but my point is that such complicated behaviors may get users
by surprise, like data segregate by timestamp caused for an user, so
having off/on switches for things like this is very interesting from a
user perspective. Start with the new behavior disabled by default,
evangelize it, let users enable it optionally, and once we're
confident about it, enable it by default, still allowing users to
disable it.
Reply all
Reply to author
Forward
0 new messages