From: Avi Kivity <
a...@scylladb.com>
Committer: Avi Kivity <
a...@scylladb.com>
Branch: next
Revert "repair: Get rid of the gc_grace_seconds"
This reverts commit a8ad385ecd3e2b372db3c354492dbe57d9d91760. It
crashes on rest_api test in release mode.
Fixes #9881.
---
diff --git a/CMakeLists.txt b/CMakeLists.txt
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -537,8 +537,6 @@ set(scylla_sources
raft/tracker.cc
range_tombstone.cc
range_tombstone_list.cc
- tombstone_gc_options.cc
- tombstone_gc.cc
reader_concurrency_semaphore.cc
redis/abstract_command.cc
redis/command_factory.cc
diff --git a/compaction/compaction.cc b/compaction/compaction.cc
--- a/compaction/compaction.cc
+++ b/compaction/compaction.cc
@@ -76,7 +76,6 @@
#include "utils/UUID_gen.hh"
#include "utils/utf8.hh"
#include "utils/fmt-compat.hh"
-#include "tombstone_gc.hh"
namespace sstables {
@@ -644,7 +643,7 @@ class compaction {
void setup() {
auto ssts = make_lw_shared<sstables::sstable_set>(make_sstable_set_for_input());
formatted_sstables_list formatted_msg;
- auto fully_expired = _table_s.fully_expired_sstables(_sstables, gc_clock::now());
+ auto fully_expired = _table_s.fully_expired_sstables(_sstables);
min_max_tracker<api::timestamp_type> timestamp_tracker;
for (auto& sst : _sstables) {
@@ -734,6 +733,7 @@ class compaction {
max_purgeable_func(),
get_compacted_fragments_writer(),
noop_compacted_fragments_consumer());
+
reader.consume_in_thread(std::move(cfc));
});
});
@@ -1746,7 +1746,7 @@ compact_sstables(sstables::compaction_descriptor descriptor, compaction_data& cd
}
std::unordered_set<sstables::shared_sstable>
-get_fully_expired_sstables(const table_state& table_s, const std::vector<sstables::shared_sstable>& compacting, gc_clock::time_point compaction_time) {
+get_fully_expired_sstables(const table_state& table_s, const std::vector<sstables::shared_sstable>& compacting, gc_clock::time_point gc_before) {
clogger.debug("Checking droppable sstables in {}.{}", table_s.schema()->ks_name(), table_s.schema()->cf_name());
if (compacting.empty()) {
@@ -1760,7 +1760,6 @@ get_fully_expired_sstables(const table_state& table_s, const std::vector<sstable
int64_t min_timestamp = std::numeric_limits<int64_t>::max();
for (auto& sstable : overlapping) {
- auto gc_before = sstable->get_gc_before_for_fully_expire(compaction_time);
if (sstable->get_max_local_deletion_time() >= gc_before) {
min_timestamp = std::min(min_timestamp, sstable->get_stats_metadata().min_timestamp);
}
@@ -1779,7 +1778,6 @@ get_fully_expired_sstables(const table_state& table_s, const std::vector<sstable
// SStables that do not contain live data is added to list of possibly expired sstables.
for (auto& candidate : compacting) {
- auto gc_before = candidate->get_gc_before_for_fully_expire(compaction_time);
clogger.debug("Checking if candidate of generation {} and max_deletion_time {} is expired, gc_before is {}",
candidate->generation(), candidate->get_stats_metadata().max_local_deletion_time, gc_before);
// A fully expired sstable which has an ancestor undeleted shouldn't be compacted because
@@ -1800,12 +1798,11 @@ get_fully_expired_sstables(const table_state& table_s, const std::vector<sstable
if (candidate->get_stats_metadata().max_timestamp >= min_timestamp) {
it = candidates.erase(it);
} else {
- clogger.debug("Dropping expired SSTable {} (maxLocalDeletionTime={})",
- candidate->get_filename(), candidate->get_stats_metadata().max_local_deletion_time);
+ clogger.debug("Dropping expired SSTable {} (maxLocalDeletionTime={}, gcBefore={})",
+ candidate->get_filename(), candidate->get_stats_metadata().max_local_deletion_time, gc_before);
it++;
}
}
- clogger.debug("Checking droppable sstables in {}.{}, candidates={}", table_s.schema()->ks_name(), table_s.schema()->cf_name(), candidates.size());
return candidates;
}
diff --git a/compaction/compaction_strategy.cc b/compaction/compaction_strategy.cc
--- a/compaction/compaction_strategy.cc
+++ b/compaction/compaction_strategy.cc
@@ -68,7 +68,7 @@ compaction_descriptor compaction_strategy_impl::get_major_compaction_job(table_s
return compaction_descriptor(std::move(candidates), table_s.get_sstable_set(), service::get_local_compaction_priority());
}
-bool compaction_strategy_impl::worth_dropping_tombstones(const shared_sstable& sst, gc_clock::time_point compaction_time) {
+bool compaction_strategy_impl::worth_dropping_tombstones(const shared_sstable& sst, gc_clock::time_point gc_before) {
if (_disable_tombstone_compaction) {
return false;
}
@@ -79,7 +79,6 @@ bool compaction_strategy_impl::worth_dropping_tombstones(const shared_sstable& s
if (db_clock::now()-_tombstone_compaction_interval < sst->data_file_write_time()) {
return false;
}
- auto gc_before = sst->get_gc_before_for_drop_estimation(compaction_time);
return sst->estimate_droppable_tombstone_ratio(gc_before) >= _tombstone_threshold;
}
@@ -422,20 +421,20 @@ time_window_compaction_strategy::time_window_compaction_strategy(const std::map<
} // namespace sstables
std::vector<sstables::shared_sstable>
-date_tiered_manifest::get_next_sstables(table_state& table_s, std::vector<sstables::shared_sstable>& uncompacting, gc_clock::time_point compaction_time) {
+date_tiered_manifest::get_next_sstables(table_state& table_s, std::vector<sstables::shared_sstable>& uncompacting, gc_clock::time_point gc_before) {
if (table_s.get_sstable_set().all()->empty()) {
return {};
}
// Find fully expired SSTables. Those will be included no matter what.
- auto expired = table_s.fully_expired_sstables(uncompacting, compaction_time);
+ auto expired = table_s.fully_expired_sstables(uncompacting);
if (!expired.empty()) {
auto is_expired = [&] (const sstables::shared_sstable& s) { return expired.contains(s); };
uncompacting.erase(boost::remove_if(uncompacting, is_expired), uncompacting.end());
}
- auto compaction_candidates = get_next_non_expired_sstables(table_s, uncompacting, compaction_time);
+ auto compaction_candidates = get_next_non_expired_sstables(table_s, uncompacting, gc_before);
if (!expired.empty()) {
compaction_candidates.insert(compaction_candidates.end(), expired.begin(), expired.end());
}
@@ -465,7 +464,7 @@ int64_t date_tiered_manifest::get_estimated_tasks(table_state& table_s) const {
}
std::vector<sstables::shared_sstable>
-date_tiered_manifest::get_next_non_expired_sstables(table_state& table_s, std::vector<sstables::shared_sstable>& non_expiring_sstables, gc_clock::time_point compaction_time) {
+date_tiered_manifest::get_next_non_expired_sstables(table_state& table_s, std::vector<sstables::shared_sstable>& non_expiring_sstables, gc_clock::time_point gc_before) {
int base = table_s.schema()->min_compaction_threshold();
int64_t now = get_now(table_s.get_sstable_set().all());
auto most_interesting = get_compaction_candidates(table_s, non_expiring_sstables, now, base);
@@ -583,17 +582,17 @@ date_tiered_compaction_strategy::date_tiered_compaction_strategy(const std::map<
}
compaction_descriptor date_tiered_compaction_strategy::get_sstables_for_compaction(table_state& table_s, strategy_control& control, std::vector<sstables::shared_sstable> candidates) {
- auto compaction_time = gc_clock::now();
- auto sstables = _manifest.get_next_sstables(table_s, candidates, compaction_time);
+ auto gc_before = gc_clock::now() - table_s.schema()->gc_grace_seconds();
+ auto sstables = _manifest.get_next_sstables(table_s, candidates, gc_before);
if (!sstables.empty()) {
date_tiered_manifest::logger.debug("datetiered: Compacting {} out of {} sstables", sstables.size(), candidates.size());
return sstables::compaction_descriptor(std::move(sstables), table_s.get_sstable_set(), service::get_local_compaction_priority());
}
// filter out sstables which droppable tombstone ratio isn't greater than the defined threshold.
- auto e = boost::range::remove_if(candidates, [this, compaction_time] (const sstables::shared_sstable& sst) -> bool {
- return !worth_dropping_tombstones(sst, compaction_time);
+ auto e = boost::range::remove_if(candidates, [this, &gc_before] (const sstables::shared_sstable& sst) -> bool {
+ return !worth_dropping_tombstones(sst, gc_before);
});
candidates.erase(e, candidates.end());
if (candidates.empty()) {
diff --git a/compaction/compaction_strategy_impl.hh b/compaction/compaction_strategy_impl.hh
--- a/compaction/compaction_strategy_impl.hh
+++ b/compaction/compaction_strategy_impl.hh
@@ -74,7 +74,7 @@ public:
// Check if a given sstable is entitled for tombstone compaction based on its
// droppable tombstone histogram and gc_before.
- bool worth_dropping_tombstones(const shared_sstable& sst, gc_clock::time_point compaction_time);
+ bool worth_dropping_tombstones(const shared_sstable& sst, gc_clock::time_point gc_before);
virtual compaction_backlog_tracker& get_backlog_tracker() = 0;
diff --git a/compaction/date_tiered_compaction_strategy.hh b/compaction/date_tiered_compaction_strategy.hh
--- a/compaction/date_tiered_compaction_strategy.hh
+++ b/compaction/date_tiered_compaction_strategy.hh
@@ -112,12 +112,12 @@ public:
: _options(options) {}
std::vector<sstables::shared_sstable>
- get_next_sstables(table_state& table_s, std::vector<sstables::shared_sstable>& uncompacting, gc_clock::time_point compaction_time);
+ get_next_sstables(table_state& table_s, std::vector<sstables::shared_sstable>& uncompacting, gc_clock::time_point gc_before);
int64_t get_estimated_tasks(table_state& table_s) const;
private:
std::vector<sstables::shared_sstable>
- get_next_non_expired_sstables(table_state& table_s, std::vector<sstables::shared_sstable>& non_expiring_sstables, gc_clock::time_point compaction_time);
+ get_next_non_expired_sstables(table_state& table_s, std::vector<sstables::shared_sstable>& non_expiring_sstables, gc_clock::time_point gc_before);
std::vector<sstables::shared_sstable>
get_compaction_candidates(table_state& table_s, std::vector<sstables::shared_sstable> candidate_sstables, int64_t now, int base);
diff --git a/compaction/leveled_compaction_strategy.cc b/compaction/leveled_compaction_strategy.cc
--- a/compaction/leveled_compaction_strategy.cc
+++ b/compaction/leveled_compaction_strategy.cc
@@ -48,21 +48,19 @@ compaction_descriptor leveled_compaction_strategy::get_sstables_for_compaction(t
// unlike stcs, lcs can look for sstable with highest droppable tombstone ratio, so as not to choose
// a sstable which droppable data shadow data in older sstable, by starting from highest levels which
// theoretically contain oldest non-overlapping data.
- auto compaction_time = gc_clock::now();
+ auto gc_before = gc_clock::now() - table_s.schema()->gc_grace_seconds();
for (auto level = int(manifest.get_level_count()); level >= 0; level--) {
auto& sstables = manifest.get_level(level);
// filter out sstables which droppable tombstone ratio isn't greater than the defined threshold.
- auto e = boost::range::remove_if(sstables, [this, compaction_time] (const sstables::shared_sstable& sst) -> bool {
- return !worth_dropping_tombstones(sst, compaction_time);
+ auto e = boost::range::remove_if(sstables, [this, &gc_before] (const sstables::shared_sstable& sst) -> bool {
+ return !worth_dropping_tombstones(sst, gc_before);
});
sstables.erase(e, sstables.end());
if (sstables.empty()) {
continue;
}
auto& sst = *std::max_element(sstables.begin(), sstables.end(), [&] (auto& i, auto& j) {
- auto gc_before1 = i->get_gc_before_for_drop_estimation(compaction_time);
- auto gc_before2 = j->get_gc_before_for_drop_estimation(compaction_time);
- return i->estimate_droppable_tombstone_ratio(gc_before1) < j->estimate_droppable_tombstone_ratio(gc_before2);
+ return i->estimate_droppable_tombstone_ratio(gc_before) < j->estimate_droppable_tombstone_ratio(gc_before);
});
return sstables::compaction_descriptor({ sst }, table_s.get_sstable_set(), service::get_local_compaction_priority(), sst->get_sstable_level());
}
diff --git a/compaction/size_tiered_compaction_strategy.cc b/compaction/size_tiered_compaction_strategy.cc
--- a/compaction/size_tiered_compaction_strategy.cc
+++ b/compaction/size_tiered_compaction_strategy.cc
@@ -161,7 +161,7 @@ size_tiered_compaction_strategy::get_sstables_for_compaction(table_state& table_
// make local copies so they can't be changed out from under us mid-method
int min_threshold = table_s.min_compaction_threshold();
int max_threshold = table_s.schema()->max_compaction_threshold();
- auto compaction_time = gc_clock::now();
+ auto gc_before = gc_clock::now() - table_s.schema()->gc_grace_seconds();
// TODO: Add support to filter cold sstables (for reference: SizeTieredCompactionStrategy::filterColdSSTables).
@@ -184,8 +184,8 @@ size_tiered_compaction_strategy::get_sstables_for_compaction(table_state& table_
// tombstone purge, i.e. less likely to shadow even older data.
for (auto&& sstables : buckets | boost::adaptors::reversed) {
// filter out sstables which droppable tombstone ratio isn't greater than the defined threshold.
- auto e = boost::range::remove_if(sstables, [this, compaction_time] (const sstables::shared_sstable& sst) -> bool {
- return !worth_dropping_tombstones(sst, compaction_time);
+ auto e = boost::range::remove_if(sstables, [this, &gc_before] (const sstables::shared_sstable& sst) -> bool {
+ return !worth_dropping_tombstones(sst, gc_before);
});
sstables.erase(e, sstables.end());
if (sstables.empty()) {
diff --git a/compaction/table_state.hh b/compaction/table_state.hh
--- a/compaction/table_state.hh
+++ b/compaction/table_state.hh
@@ -42,7 +42,7 @@ public:
virtual unsigned min_compaction_threshold() const noexcept = 0;
virtual bool compaction_enforce_min_threshold() const noexcept = 0;
virtual const sstables::sstable_set& get_sstable_set() const = 0;
- virtual std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point compaction_time) const = 0;
+ virtual std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables) const = 0;
virtual const std::vector<sstables::shared_sstable>& compacted_undeleted_sstables() const noexcept = 0;
virtual sstables::compaction_strategy& get_compaction_strategy() const noexcept = 0;
virtual reader_permit make_compaction_reader_permit() const = 0;
diff --git a/compaction/time_window_compaction_strategy.cc b/compaction/time_window_compaction_strategy.cc
--- a/compaction/time_window_compaction_strategy.cc
+++ b/compaction/time_window_compaction_strategy.cc
@@ -227,7 +227,7 @@ time_window_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> i
compaction_descriptor
time_window_compaction_strategy::get_sstables_for_compaction(table_state& table_s, strategy_control& control, std::vector<shared_sstable> candidates) {
- auto compaction_time = gc_clock::now();
+ auto gc_before = gc_clock::now() - table_s.schema()->gc_grace_seconds();
if (candidates.empty()) {
return compaction_descriptor();
@@ -238,7 +238,7 @@ time_window_compaction_strategy::get_sstables_for_compaction(table_state& table_
if (db_clock::now() - _last_expired_check > _options.expired_sstable_check_frequency) {
clogger.debug("TWCS expired check sufficiently far in the past, checking for fully expired SSTables");
- expired = table_s.fully_expired_sstables(candidates, compaction_time);
+ expired = table_s.fully_expired_sstables(candidates);
_last_expired_check = db_clock::now();
} else {
clogger.debug("TWCS skipping check for fully expired SSTables");
@@ -249,7 +249,7 @@ time_window_compaction_strategy::get_sstables_for_compaction(table_state& table_
return compaction_descriptor(has_only_fully_expired::yes, std::vector<shared_sstable>(expired.begin(), expired.end()), table_s.get_sstable_set(), service::get_local_compaction_priority());
}
- auto compaction_candidates = get_next_non_expired_sstables(table_s, control, std::move(candidates), compaction_time);
+ auto compaction_candidates = get_next_non_expired_sstables(table_s, control, std::move(candidates), gc_before);
return compaction_descriptor(std::move(compaction_candidates), table_s.get_sstable_set(), service::get_local_compaction_priority());
}
@@ -270,7 +270,7 @@ time_window_compaction_strategy::compaction_mode(const bucket_t& bucket, timesta
std::vector<shared_sstable>
time_window_compaction_strategy::get_next_non_expired_sstables(table_state& table_s, strategy_control& control,
- std::vector<shared_sstable> non_expiring_sstables, gc_clock::time_point compaction_time) {
+ std::vector<shared_sstable> non_expiring_sstables, gc_clock::time_point gc_before) {
auto most_interesting = get_compaction_candidates(table_s, control, non_expiring_sstables);
if (!most_interesting.empty()) {
@@ -279,8 +279,8 @@ time_window_compaction_strategy::get_next_non_expired_sstables(table_state& tabl
// if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
// ratio is greater than threshold.
- auto e = boost::range::remove_if(non_expiring_sstables, [this, compaction_time] (const shared_sstable& sst) -> bool {
- return !worth_dropping_tombstones(sst, compaction_time);
+ auto e = boost::range::remove_if(non_expiring_sstables, [this, &gc_before] (const shared_sstable& sst) -> bool {
+ return !worth_dropping_tombstones(sst, gc_before);
});
non_expiring_sstables.erase(e, non_expiring_sstables.end());
if (non_expiring_sstables.empty()) {
diff --git a/compaction/time_window_compaction_strategy.hh b/compaction/time_window_compaction_strategy.hh
--- a/compaction/time_window_compaction_strategy.hh
+++ b/compaction/time_window_compaction_strategy.hh
@@ -139,7 +139,7 @@ private:
compaction_mode(const bucket_t& bucket, timestamp_type bucket_key, timestamp_type now, size_t min_threshold) const;
std::vector<shared_sstable>
- get_next_non_expired_sstables(table_state& table_s, strategy_control& control, std::vector<shared_sstable> non_expiring_sstables, gc_clock::time_point compaction_time);
+ get_next_non_expired_sstables(table_state& table_s, strategy_control& control, std::vector<shared_sstable> non_expiring_sstables, gc_clock::time_point gc_before);
std::vector<shared_sstable> get_compaction_candidates(table_state& table_s, strategy_control& control, std::vector<shared_sstable> candidate_sstables);
public:
diff --git a/configure.py b/configure.py
--- a/configure.py
+++ b/configure.py
@@ -989,8 +989,6 @@ def find_headers(repodir, excluded_dirs):
'table_helper.cc',
'range_tombstone.cc',
'range_tombstone_list.cc',
- 'tombstone_gc_options.cc',
- 'tombstone_gc.cc',
'utils/disk-error-handler.cc',
'duration.cc',
'vint-serialization.cc',
diff --git a/cql3/statements/alter_table_statement.cc b/cql3/statements/alter_table_statement.cc
--- a/cql3/statements/alter_table_statement.cc
+++ b/cql3/statements/alter_table_statement.cc
@@ -349,7 +349,7 @@ std::pair<schema_builder, std::vector<view_ptr>> alter_table_statement::prepare_
{
auto schema_extensions = _properties->make_schema_extensions(db.extensions());
- _properties->validate(db, keyspace(), schema_extensions);
+ _properties->validate(db, schema_extensions);
if (!cf.views().empty() && _properties->get_gc_grace_seconds() == 0) {
throw exceptions::invalid_request_exception(
diff --git a/cql3/statements/alter_view_statement.cc b/cql3/statements/alter_view_statement.cc
--- a/cql3/statements/alter_view_statement.cc
+++ b/cql3/statements/alter_view_statement.cc
@@ -90,7 +90,7 @@ view_ptr alter_view_statement::prepare_view(data_dictionary::database db) const
}
auto schema_extensions = _properties->make_schema_extensions(db.extensions());
- _properties->validate(db, keyspace(), schema_extensions);
+ _properties->validate(db, schema_extensions);
auto builder = schema_builder(schema);
_properties->apply_to_builder(builder, std::move(schema_extensions));
diff --git a/cql3/statements/cf_prop_defs.cc b/cql3/statements/cf_prop_defs.cc
--- a/cql3/statements/cf_prop_defs.cc
+++ b/cql3/statements/cf_prop_defs.cc
@@ -46,8 +46,6 @@
#include "cdc/cdc_extension.hh"
#include "gms/feature.hh"
#include "gms/feature_service.hh"
-#include "tombstone_gc_extension.hh"
-#include "tombstone_gc.hh"
#include <boost/algorithm/string/predicate.hpp>
@@ -96,7 +94,7 @@ schema::extensions_map cf_prop_defs::make_schema_extensions(const db::extensions
return er;
}
-void cf_prop_defs::validate(const data_dictionary::database db, sstring ks_name, const schema::extensions_map& schema_extensions) const {
+void cf_prop_defs::validate(const data_dictionary::database db, const schema::extensions_map& schema_extensions) const {
// Skip validation if the comapction strategy class is already set as it means we've alreayd
// prepared (and redoing it would set strategyClass back to null, which we don't want)
if (_compaction_strategy_class) {
@@ -158,9 +156,6 @@ void cf_prop_defs::validate(const data_dictionary::database db, sstring ks_name,
throw exceptions::configuration_exception("CDC not supported by the cluster");
}
- auto tombstone_gc_options = get_tombstone_gc_options(schema_extensions);
- validate_tombstone_gc_options(tombstone_gc_options, db.real_database(), ks_name);
-
validate_minimum_int(KW_DEFAULT_TIME_TO_LIVE, 0, DEFAULT_DEFAULT_TIME_TO_LIVE);
validate_minimum_int(KW_PAXOSGRACESECONDS, 0, DEFAULT_GC_GRACE_SECONDS);
@@ -240,16 +235,6 @@ const cdc::options* cf_prop_defs::get_cdc_options(const schema::extensions_map&
return &cdc_ext->get_options();
}
-const tombstone_gc_options* cf_prop_defs::get_tombstone_gc_options(const schema::extensions_map& schema_exts) const {
- auto it = schema_exts.find(tombstone_gc_extension::NAME);
- if (it == schema_exts.end()) {
- return nullptr;
- }
-
- auto ext = dynamic_pointer_cast<tombstone_gc_extension>(it->second);
- return &ext->get_options();
-}
-
void cf_prop_defs::apply_to_builder(schema_builder& builder, schema::extensions_map schema_extensions) const {
if (has_property(KW_COMMENT)) {
builder.set_comment(get_string(KW_COMMENT, ""));
diff --git a/cql3/statements/cf_prop_defs.hh b/cql3/statements/cf_prop_defs.hh
--- a/cql3/statements/cf_prop_defs.hh
+++ b/cql3/statements/cf_prop_defs.hh
@@ -51,8 +51,6 @@ namespace data_dictionary {
class database;
}
-class tombstone_gc_options;
-
namespace db {
class extensions;
}
@@ -103,12 +101,11 @@ public:
std::optional<sstables::compaction_strategy_type> get_compaction_strategy_class() const;
schema::extensions_map make_schema_extensions(const db::extensions& exts) const;
- void validate(const data_dictionary::database db, sstring ks_name, const schema::extensions_map& schema_extensions) const;
+ void validate(const data_dictionary::database db, const schema::extensions_map& schema_extensions) const;
std::map<sstring, sstring> get_compaction_type_options() const;
std::optional<std::map<sstring, sstring>> get_compression_options() const;
const cdc::options* get_cdc_options(const schema::extensions_map&) const;
std::optional<caching_options> get_caching_options() const;
- const tombstone_gc_options* get_tombstone_gc_options(const schema::extensions_map&) const;
#if 0
public CachingOptions getCachingOptions() throws SyntaxException, ConfigurationException
{
diff --git a/cql3/statements/cf_properties.hh b/cql3/statements/cf_properties.hh
--- a/cql3/statements/cf_properties.hh
+++ b/cql3/statements/cf_properties.hh
@@ -94,8 +94,8 @@ public:
_defined_ordering.emplace_back(alias, reversed);
}
- void validate(const data_dictionary::database db, sstring ks_name, const schema::extensions_map& schema_extensions) const {
- _properties->validate(db, std::move(ks_name), schema_extensions);
+ void validate(const data_dictionary::database db, const schema::extensions_map& schema_extensions) const {
+ _properties->validate(db, schema_extensions);
}
};
diff --git a/cql3/statements/create_table_statement.cc b/cql3/statements/create_table_statement.cc
--- a/cql3/statements/create_table_statement.cc
+++ b/cql3/statements/create_table_statement.cc
@@ -211,7 +211,7 @@ std::unique_ptr<prepared_statement> create_table_statement::raw_statement::prepa
throw exceptions::invalid_request_exception(format("Multiple definition of identifier {}", (*i)->text()));
}
- _properties.validate(db, keyspace(), _properties.properties()->make_schema_extensions(db.extensions()));
+ _properties.validate(db, _properties.properties()->make_schema_extensions(db.extensions()));
const bool has_default_ttl = _properties.properties()->get_default_time_to_live() > 0;
auto stmt = ::make_shared<create_table_statement>(*_cf_name, _properties.properties(), _if_not_exists, _static_columns, _properties.properties()->get_id());
diff --git a/cql3/statements/create_view_statement.cc b/cql3/statements/create_view_statement.cc
--- a/cql3/statements/create_view_statement.cc
+++ b/cql3/statements/create_view_statement.cc
@@ -154,7 +154,7 @@ view_ptr create_view_statement::prepare_view(data_dictionary::database db) const
// - make sure base_table gc_grace_seconds > 0
auto schema_extensions = _properties.properties()->make_schema_extensions(db.extensions());
- _properties.validate(db, keyspace(), schema_extensions);
+ _properties.validate(db, schema_extensions);
if (_properties.use_compact_storage()) {
throw exceptions::invalid_request_exception(format("Cannot use 'COMPACT STORAGE' when defining a materialized view"));
diff --git a/database.cc b/database.cc
--- a/database.cc
+++ b/database.cc
@@ -71,7 +71,6 @@
#include "locator/abstract_replication_strategy.hh"
#include "timeout_config.hh"
-#include "tombstone_gc.hh"
#include "data_dictionary/impl.hh"
@@ -941,7 +940,6 @@ future<> database::drop_column_family(const sstring& ks_name, const sstring& cf_
lw_shared_ptr<table> cf;
try {
cf = _
column_families.at(uuid);
- drop_repair_history_map_for_table(uuid);
} catch (std::out_of_range&) {
on_internal_error(dblog, fmt::format("drop_column_family {}.{}: UUID={} not found", ks_name, cf_name, uuid));
}
@@ -2069,7 +2067,6 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun
// call.
auto low_mark = cf.set_low_replay_position_mark();
- const auto uuid = cf.schema()->id();
return _compaction_manager->run_with_compaction_disabled(&cf, [this, &cf, should_flush, auto_snapshot, tsf = std::move(tsf), low_mark]() mutable {
future<> f = make_ready_future<>();
@@ -2115,8 +2112,6 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun
});
});
});
- }).then([this, uuid] {
- drop_repair_history_map_for_table(uuid);
});
});
}
diff --git a/db/config.cc b/db/config.cc
--- a/db/config.cc
+++ b/db/config.cc
@@ -35,7 +35,6 @@
#include <seastar/net/tls.hh>
#include "cdc/cdc_extension.hh"
-#include "tombstone_gc_extension.hh"
#include "config.hh"
#include "extensions.hh"
#include "log.hh"
diff --git a/db/schema_tables.cc b/db/schema_tables.cc
--- a/db/schema_tables.cc
+++ b/db/schema_tables.cc
@@ -958,7 +958,7 @@ mutation compact_for_schema_digest(const mutation& m) {
// See
https://issues.apache.org/jira/browse/CASSANDRA-6862.
// We achieve similar effect with compact_for_compaction().
mutation m_compacted(m);
- m_compacted.partition().compact_for_compaction_drop_tombstones_unconditionally(*m.schema(), m.decorated_key());
+ m_compacted.partition().compact_for_compaction(*m.schema(), always_gc, gc_clock::time_point::max());
return m_compacted;
}
diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc
--- a/db/system_keyspace.cc
+++ b/db/system_keyspace.cc
@@ -288,26 +288,6 @@ schema_ptr system_keyspace::raft_config() {
return schema;
}
-schema_ptr system_keyspace::repair_history() {
- static thread_local auto schema = [] {
- auto id = generate_legacy_id(NAME, REPAIR_HISTORY);
- return schema_builder(NAME, REPAIR_HISTORY, std::optional(id))
- .with_column("table_uuid", uuid_type, column_kind::partition_key)
- // The time is repair start time
- .with_column("repair_time", timestamp_type, column_kind::clustering_key)
- .with_column("repair_uuid", uuid_type, column_kind::clustering_key)
- // The token range is (range_start, range_end]
- .with_column("range_start", long_type, column_kind::clustering_key)
- .with_column("range_end", long_type, column_kind::clustering_key)
- .with_column("keyspace_name", utf8_type, column_kind::static_column)
- .with_column("table_name", utf8_type, column_kind::static_column)
- .set_comment("Record repair history")
- .with_version(generate_schema_version(id))
- .build();
- }();
- return schema;
-}
-
schema_ptr system_keyspace::built_indexes() {
static thread_local auto built_indexes = [] {
schema_builder builder(generate_legacy_id(NAME, BUILT_INDEXES), NAME, BUILT_INDEXES,
@@ -2538,7 +2518,6 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
compactions_in_progress(), compaction_history(),
sstable_activity(), clients(), size_estimates(), large_partitions(), large_rows(), large_cells(),
scylla_local(), db::schema_tables::scylla_table_schema_history(),
- repair_history(),
v3::views_builds_in_progress(), v3::built_views(),
v3::scylla_views_builds_in_progress(),
v3::truncated(),
diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh
--- a/db/system_keyspace.hh
+++ b/db/system_keyspace.hh
@@ -149,7 +149,6 @@ public:
static constexpr auto RAFT = "raft";
static constexpr auto RAFT_SNAPSHOTS = "raft_snapshots";
static constexpr auto RAFT_CONFIG = "raft_config";
- static constexpr auto REPAIR_HISTORY = "repair_history";
static const char *const CLIENTS;
struct v3 {
@@ -231,7 +230,6 @@ public:
static schema_ptr built_indexes(); // TODO (from Cassandra): make private
static schema_ptr raft();
static schema_ptr raft_snapshots();
- static schema_ptr repair_history();
static table_schema_version generate_schema_version(utils::UUID table_id, uint16_t offset = 0);
diff --git a/db/view/view.cc b/db/view/view.cc
--- a/db/view/view.cc
+++ b/db/view/view.cc
@@ -995,8 +995,7 @@ void view_update_builder::generate_update(clustering_row&& update, std::optional
throw std::logic_error("Empty materialized view updated");
}
- auto dk = dht::decorate_key(*_schema, _key);
- auto gc_before = ::get_gc_before_for_key(_schema, dk, _now);
+ auto gc_before = _now - _schema->gc_grace_seconds();
// We allow existing to be disengaged, which we treat the same as an empty row.
if (existing) {
diff --git a/gms/feature.hh b/gms/feature.hh
--- a/gms/feature.hh
+++ b/gms/feature.hh
@@ -151,7 +151,6 @@ extern const std::string_view UDA;
extern const std::string_view SEPARATE_PAGE_SIZE_AND_SAFETY_LIMIT;
extern const std::string_view SUPPORTS_RAFT_CLUSTER_MANAGEMENT;
extern const std::string_view USES_RAFT_CLUSTER_MANAGEMENT;
-extern const std::string_view TOMBSTONE_GC_OPTIONS;
}
diff --git a/gms/feature_service.cc b/gms/feature_service.cc
--- a/gms/feature_service.cc
+++ b/gms/feature_service.cc
@@ -76,7 +76,6 @@ constexpr std::string_view features::UDA = "UDA";
constexpr std::string_view features::SEPARATE_PAGE_SIZE_AND_SAFETY_LIMIT = "SEPARATE_PAGE_SIZE_AND_SAFETY_LIMIT";
constexpr std::string_view features::SUPPORTS_RAFT_CLUSTER_MANAGEMENT = "SUPPORTS_RAFT_CLUSTER_MANAGEMENT";
constexpr std::string_view features::USES_RAFT_CLUSTER_MANAGEMENT = "USES_RAFT_CLUSTER_MANAGEMENT";
-constexpr std::string_view features::TOMBSTONE_GC_OPTIONS = "TOMBSTONE_GC_OPTIONS";
static logging::logger logger("features");
@@ -105,7 +104,6 @@ feature_service::feature_service(feature_config cfg) : _config(cfg)
, _separate_page_size_and_safety_limit(*this, features::SEPARATE_PAGE_SIZE_AND_SAFETY_LIMIT)
, _supports_raft_cluster_mgmt(*this, features::SUPPORTS_RAFT_CLUSTER_MANAGEMENT)
, _uses_raft_cluster_mgmt(*this, features::USES_RAFT_CLUSTER_MANAGEMENT)
- , _tombstone_gc_options(*this, features::TOMBSTONE_GC_OPTIONS)
, _raft_support_listener(_supports_raft_cluster_mgmt.when_enabled([this] {
// When the cluster fully supports raft-based cluster management,
// we can re-enable support for the second gossip feature to trigger
@@ -229,7 +227,6 @@ std::set<std::string_view> feature_service::known_feature_set() {
gms::features::SEPARATE_PAGE_SIZE_AND_SAFETY_LIMIT,
gms::features::SUPPORTS_RAFT_CLUSTER_MANAGEMENT,
gms::features::USES_RAFT_CLUSTER_MANAGEMENT,
- gms::features::TOMBSTONE_GC_OPTIONS,
};
for (const sstring& s : _config._disabled_features) {
@@ -338,7 +335,6 @@ void feature_service::enable(const std::set<std::string_view>& list) {
std::ref(_separate_page_size_and_safety_limit),
std::ref(_supports_raft_cluster_mgmt),
std::ref(_uses_raft_cluster_mgmt),
- std::ref(_tombstone_gc_options),
})
{
if (list.contains(
f.name())) {
diff --git a/gms/feature_service.hh b/gms/feature_service.hh
--- a/gms/feature_service.hh
+++ b/gms/feature_service.hh
@@ -103,7 +103,6 @@ private:
gms::feature _separate_page_size_and_safety_limit;
gms::feature _supports_raft_cluster_mgmt;
gms::feature _uses_raft_cluster_mgmt;
- gms::feature _tombstone_gc_options;
gms::feature::listener_registration _raft_support_listener;
@@ -200,10 +199,6 @@ public:
return bool(_separate_page_size_and_safety_limit);
}
- bool cluster_supports_tombstone_gc_options() const {
- return bool(_tombstone_gc_options);
- }
-
static std::set<sstring> to_feature_set(sstring features_string);
// Persist enabled feature in the `system.scylla_local` table under the "enabled_features" key.
// The key itself is maintained as an `unordered_set<string>` and serialized via `to_string`
diff --git a/idl/partition_checksum.idl.hh b/idl/partition_checksum.idl.hh
--- a/idl/partition_checksum.idl.hh
+++ b/idl/partition_checksum.idl.hh
@@ -142,28 +142,3 @@ struct node_ops_cmd_response {
// Optional field, set by query_pending_ops cmd
std::list<utils::UUID> pending_ops;
};
-
-struct repair_update_system_table_request {
- utils::UUID repair_uuid;
- utils::UUID table_uuid;
- sstring keyspace_name;
- sstring table_name;
- dht::token_range range;
- gc_clock::time_point repair_time;
-};
-
-struct repair_update_system_table_response {
-};
-
-struct repair_flush_hints_batchlog_request {
- utils::UUID repair_uuid;
- std::list<gms::inet_address> target_nodes;
- std::chrono::seconds hints_timeout;
- std::chrono::seconds batchlog_timeout;
-};
-
-struct repair_flush_hints_batchlog_response {
-};
-
-verb [[with_client_info]] repair_update_system_table (repair_update_system_table_request) -> repair_update_system_table_response;
-verb [[with_client_info]] repair_flush_hints_batchlog (repair_flush_hints_batchlog_request) -> repair_flush_hints_batchlog_response;
diff --git a/main.cc b/main.cc
--- a/main.cc
+++ b/main.cc
@@ -90,7 +90,6 @@
#include "cdc/log.hh"
#include "cdc/cdc_extension.hh"
#include "cdc/generation_service.hh"
-#include "tombstone_gc_extension.hh"
#include "alternator/tags_extension.hh"
#include "db/paxos_grace_seconds_extension.hh"
#include "service/qos/standard_service_level_distributed_data_accessor.hh"
@@ -438,7 +437,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
ext->add_schema_extension<alternator::tags_extension>(alternator::tags_extension::NAME);
ext->add_schema_extension<cdc::cdc_extension>(cdc::cdc_extension::NAME);
ext->add_schema_extension<db::paxos_grace_seconds_extension>(db::paxos_grace_seconds_extension::NAME);
- ext->add_schema_extension<tombstone_gc_extension>(tombstone_gc_extension::NAME);
auto cfg = make_lw_shared<db::config>(ext);
auto init = app.get_options_description().add_options();
@@ -1104,7 +1102,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
// both)
supervisor::notify("starting messaging service");
auto max_memory_repair = memory::stats().total_memory() * 0.1;
- repair.start(std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(bm), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(mm), max_memory_repair).get();
+ repair.start(std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(mm), max_memory_repair).get();
auto stop_repair_service = defer_verbose_shutdown("repair service", [&repair] {
repair.stop().get();
});
diff --git a/message/messaging_service.cc b/message/messaging_service.cc
--- a/message/messaging_service.cc
+++ b/message/messaging_service.cc
@@ -81,6 +81,7 @@
#include "idl/gossip_digest.dist.impl.hh"
#include "idl/read_command.dist.impl.hh"
#include "idl/range.dist.impl.hh"
+#include "idl/partition_checksum.dist.impl.hh"
#include "idl/query.dist.impl.hh"
#include "idl/cache_temperature.dist.impl.hh"
#include "idl/mutation.dist.impl.hh"
@@ -102,7 +103,6 @@
#include "locator/snitch_base.hh"
#include "message/rpc_protocol_impl.hh"
-#include "idl/partition_checksum.dist.impl.hh"
namespace netw {
@@ -469,8 +469,6 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM:
case messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM:
case messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM:
- case messaging_verb::REPAIR_UPDATE_SYSTEM_TABLE:
- case messaging_verb::REPAIR_FLUSH_HINTS_BATCHLOG:
case messaging_verb::NODE_OPS_CMD:
case messaging_verb::HINT_MUTATION:
return 1;
diff --git a/message/messaging_service.hh b/message/messaging_service.hh
--- a/message/messaging_service.hh
+++ b/message/messaging_service.hh
@@ -161,9 +161,7 @@ enum class messaging_verb : int32_t {
RAFT_MODIFY_CONFIG = 56,
GROUP0_PEER_EXCHANGE = 57,
GROUP0_MODIFY_CONFIG = 58,
- REPAIR_UPDATE_SYSTEM_TABLE = 59,
- REPAIR_FLUSH_HINTS_BATCHLOG = 60,
- LAST = 61,
+ LAST = 59,
};
} // namespace netw
diff --git a/mutation_compactor.hh b/mutation_compactor.hh
--- a/mutation_compactor.hh
+++ b/mutation_compactor.hh
@@ -23,7 +23,6 @@
#include "compaction/compaction_garbage_collector.hh"
#include "mutation_fragment.hh"
-#include "tombstone_gc.hh"
static inline bool has_ck_selector(const query::clustering_row_ranges& ranges) {
// Like PK range, an empty row range, should be considered an "exclude all" restriction
@@ -151,10 +150,10 @@ template<emit_only_live_rows OnlyLive, compact_for_sstables SSTableCompaction>
class compact_mutation_state {
const schema& _schema;
gc_clock::time_point _query_time;
+ gc_clock::time_point _gc_before;
std::function<api::timestamp_type(const dht::decorated_key&)> _get_max_purgeable;
can_gc_fn _can_gc;
api::timestamp_type _max_purgeable = api::missing_timestamp;
- std::optional<gc_clock::time_point> _gc_before;
const query::partition_slice& _slice;
uint64_t _row_limit{};
uint32_t _partition_limit{};
@@ -210,26 +209,13 @@ private:
}
bool can_purge_tombstone(const tombstone& t) {
- return can_gc(t) && t.deletion_time < get_gc_before();
+ return t.deletion_time < _gc_before && can_gc(t);
};
bool can_purge_tombstone(const row_tombstone& t) {
- return can_gc(t.tomb()) && t.max_deletion_time() < get_gc_before();
+ return t.max_deletion_time() < _gc_before && can_gc(t.tomb());
};
- gc_clock::time_point get_gc_before() {
- if (_gc_before) {
- return _gc_before.value();
- } else {
- if (_dk) {
- _gc_before = ::get_gc_before_for_key(_schema.shared_from_this(), *_dk, _query_time);
- return _gc_before.value();
- } else {
- return gc_clock::time_point::min();
- }
- }
- }
-
bool can_gc(tombstone t) {
if (!sstable_compaction()) {
return true;
@@ -255,6 +241,7 @@ public:
uint32_t partition_limit)
: _schema(s)
, _query_time(query_time)
+ , _gc_before(saturating_subtract(query_time, s.gc_grace_seconds()))
, _can_gc(always_gc)
, _slice(slice)
, _row_limit(limit)
@@ -270,6 +257,7 @@ public:
std::function<api::timestamp_type(const dht::decorated_key&)> get_max_purgeable)
: _schema(s)
, _query_time(compaction_time)
+ , _gc_before(saturating_subtract(_query_time, s.gc_grace_seconds()))
, _get_max_purgeable(std::move(get_max_purgeable))
, _can_gc([this] (tombstone t) { return can_gc(t); })
, _slice(s.full_slice())
@@ -294,7 +282,6 @@ public:
_range_tombstones.clear();
_current_partition_limit = std::min(_row_limit, _partition_row_limit);
_max_purgeable = api::missing_timestamp;
- _gc_before = std::nullopt;
_last_static_row.reset();
}
@@ -319,9 +306,8 @@ public:
if constexpr (sstable_compaction()) {
_collector->start_collecting_static_row();
}
- auto gc_before = get_gc_before();
bool is_live = sr.cells().compact_and_expire(_schema, column_kind::static_column, row_tombstone(current_tombstone),
- _query_time, _can_gc, gc_before, _collector.get());
+ _query_time, _can_gc, _gc_before, _collector.get());
_stats.static_rows += is_live;
if constexpr (sstable_compaction()) {
_collector->consume_static_row([this, &gc_consumer, current_tombstone] (static_row&& sr_garbage) {
@@ -364,9 +350,9 @@ public:
cr.remove_tombstone();
}
}
- auto gc_before = get_gc_before();
- bool is_live = cr.marker().compact_and_expire(t.tomb(), _query_time, _can_gc, gc_before, _collector.get());
- is_live |= cr.cells().compact_and_expire(_schema, column_kind::regular_column, t, _query_time, _can_gc, gc_before, cr.marker(),
+
+ bool is_live = cr.marker().compact_and_expire(t.tomb(), _query_time, _can_gc, _gc_before, _collector.get());
+ is_live |= cr.cells().compact_and_expire(_schema, column_kind::regular_column, t, _query_time, _can_gc, _gc_before, cr.marker(),
_collector.get());
_stats.clustering_rows += is_live;
@@ -484,6 +470,7 @@ public:
_rows_in_current_partition = 0;
_current_partition_limit = std::min(_row_limit, _partition_row_limit);
_query_time = query_time;
+ _gc_before = saturating_subtract(query_time, _schema.gc_grace_seconds());
_stats = {};
if ((next_fragment_kind == mutation_fragment::kind::clustering_row || next_fragment_kind == mutation_fragment::kind::range_tombstone)
@@ -530,8 +517,7 @@ public:
}
compact_mutation(const schema& s, gc_clock::time_point compaction_time,
- std::function<api::timestamp_type(const dht::decorated_key&)> get_max_purgeable,
- Consumer consumer, GCConsumer gc_consumer = GCConsumer())
+ std::function<api::timestamp_type(const dht::decorated_key&)> get_max_purgeable, Consumer consumer, GCConsumer gc_consumer = GCConsumer())
: _state(make_lw_shared<compact_mutation_state<OnlyLive, SSTableCompaction>>(s, compaction_time, get_max_purgeable))
, _consumer(std::move(consumer))
, _gc_consumer(std::move(gc_consumer)) {
diff --git a/mutation_partition.cc b/mutation_partition.cc
--- a/mutation_partition.cc
+++ b/mutation_partition.cc
@@ -41,7 +41,6 @@
#include "utils/exceptions.hh"
#include "clustering_key_filter.hh"
#include "mutation_partition_view.hh"
-#include "tombstone_gc.hh"
logging::logger mplog("mutation_partition");
@@ -1295,20 +1294,17 @@ void mutation_partition::trim_rows(const schema& s,
}
uint32_t mutation_partition::do_compact(const schema& s,
- const dht::decorated_key& dk,
gc_clock::time_point query_time,
const std::vector<query::clustering_range>& row_ranges,
bool always_return_static_content,
bool reverse,
uint64_t row_limit,
- can_gc_fn& can_gc,
- bool drop_tombstones_unconditionally)
+ can_gc_fn& can_gc)
{
check_schema(s);
assert(row_limit > 0);
- auto gc_before = drop_tombstones_unconditionally ? gc_clock::time_point::max() :
- ::get_gc_before_for_key(s.shared_from_this(), dk, query_time);
+ auto gc_before = saturating_subtract(query_time, s.gc_grace_seconds());
auto should_purge_tombstone = [&] (const tombstone& t) {
return t.deletion_time < gc_before && can_gc(t);
@@ -1367,39 +1363,25 @@ uint32_t mutation_partition::do_compact(const schema& s,
uint64_t
mutation_partition::compact_for_query(
const schema& s,
- const dht::decorated_key& dk,
gc_clock::time_point query_time,
const std::vector<query::clustering_range>& row_ranges,
bool always_return_static_content,
bool reverse,
uint64_t row_limit)
{
check_schema(s);
- bool drop_tombstones_unconditionally = false;
- return do_compact(s, dk, query_time, row_ranges, always_return_static_content, reverse, row_limit, always_gc, drop_tombstones_unconditionally);
+ return do_compact(s, query_time, row_ranges, always_return_static_content, reverse, row_limit, always_gc);
}
void mutation_partition::compact_for_compaction(const schema& s,
- can_gc_fn& can_gc, const dht::decorated_key& dk, gc_clock::time_point compaction_time)
+ can_gc_fn& can_gc, gc_clock::time_point compaction_time)
{
check_schema(s);
static const std::vector<query::clustering_range> all_rows = {
query::clustering_range::make_open_ended_both_sides()
};
- bool drop_tombstones_unconditionally = false;
- do_compact(s, dk, compaction_time, all_rows, true, false, query::partition_max_rows, can_gc, drop_tombstones_unconditionally);
-}
-
-void mutation_partition::compact_for_compaction_drop_tombstones_unconditionally(const schema& s, const dht::decorated_key& dk)
-{
- check_schema(s);
- static const std::vector<query::clustering_range> all_rows = {
- query::clustering_range::make_open_ended_both_sides()
- };
- bool drop_tombstones_unconditionally = true;
- auto compaction_time = gc_clock::time_point::max();
- do_compact(s, dk, compaction_time, all_rows, true, false, query::partition_max_rows, always_gc, drop_tombstones_unconditionally);
+ do_compact(s, compaction_time, all_rows, true, false, query::partition_max_rows, can_gc);
}
// Returns true if the mutation_partition represents no writes.
diff --git a/mutation_partition.hh b/mutation_partition.hh
--- a/mutation_partition.hh
+++ b/mutation_partition.hh
@@ -1241,14 +1241,12 @@ private:
void insert_row(const schema& s, const clustering_key& key, const deletable_row& row);
uint32_t do_compact(const schema& s,
- const dht::decorated_key& dk,
gc_clock::time_point now,
const std::vector<query::clustering_range>& row_ranges,
bool always_return_static_content,
bool reverse,
uint64_t row_limit,
- can_gc_fn&,
- bool drop_tombstones_unconditionally);
+ can_gc_fn&);
// Calls func for each row entry inside row_ranges until func returns stop_iteration::yes.
// Removes all entries for which func didn't return stop_iteration::no or wasn't called at all.
@@ -1276,7 +1274,7 @@ public:
//
// The row_limit parameter must be > 0.
//
- uint64_t compact_for_query(const schema& s, const dht::decorated_key& dk, gc_clock::time_point query_time,
+ uint64_t compact_for_query(const schema& s, gc_clock::time_point query_time,
const std::vector<query::clustering_range>& row_ranges, bool always_return_static_content,
bool reversed, uint64_t row_limit);
@@ -1285,13 +1283,8 @@ public:
// - drops cells covered by higher-level tombstones
// - drops expired tombstones which timestamp is before max_purgeable
void compact_for_compaction(const schema& s, can_gc_fn&,
- const dht::decorated_key& dk,
gc_clock::time_point compaction_time);
- // Like compact_for_compaction but drop tombstones unconditionally
- void compact_for_compaction_drop_tombstones_unconditionally(const schema& s,
- const dht::decorated_key& dk);
-
// Returns the minimal mutation_partition that when applied to "other" will
// create a mutation_partition equal to the sum of other and this one.
// This and other must both be governed by the same schema s.
diff --git a/repair/repair.cc b/repair/repair.cc
--- a/repair/repair.cc
+++ b/repair/repair.cc
@@ -55,8 +55,6 @@
#include <cfloat>
-#include "idl/partition_checksum.dist.hh"
-
logging::logger rlogger("repair");
void node_ops_info::check_abort() {
@@ -315,7 +313,7 @@ static std::vector<gms::inet_address> get_neighbors(database& db,
#endif
}
-static future<std::list<gms::inet_address>> get_hosts_participating_in_repair(database& db,
+static future<std::vector<gms::inet_address>> get_hosts_participating_in_repair(database& db,
const sstring& ksname,
const dht::token_range_vector& ranges,
const std::vector<sstring>& data_centers,
@@ -335,7 +333,7 @@ static future<std::list<gms::inet_address>> get_hosts_participating_in_repair(da
}
});
- co_return std::list<gms::inet_address>(participating_hosts.begin(), participating_hosts.end());
+ co_return std::vector<gms::inet_address>(participating_hosts.begin(), participating_hosts.end());
}
static tracker* _the_tracker = nullptr;
@@ -593,10 +591,8 @@ repair_info::repair_info(repair_service& repair,
const std::vector<sstring>& hosts_,
const std::unordered_set<gms::inet_address>& ignore_nodes_,
streaming::stream_reason reason_,
- std::optional<utils::UUID> ops_uuid,
- bool hints_batchlog_flushed)
- : rs(repair)
- , db(repair.get_db())
+ std::optional<utils::UUID> ops_uuid)
+ : db(repair.get_db())
, messaging(repair.get_messaging().container())
, sys_dist_ks(repair.get_sys_dist_ks())
, view_update_generator(repair.get_view_update_generator())
@@ -613,10 +609,8 @@ repair_info::repair_info(repair_service& repair,
, hosts(hosts_)
, ignore_nodes(ignore_nodes_)
, reason(reason_)
- , total_rf(db.local().find_keyspace(keyspace).get_effective_replication_map()->get_replication_factor())
, nr_ranges_total(ranges.size())
- , _ops_uuid(std::move(ops_uuid))
- , _hints_batchlog_flushed(std::move(hints_batchlog_flushed)) {
+ , _ops_uuid(std::move(ops_uuid)) {
}
void repair_info::check_failed_ranges() {
@@ -1126,41 +1120,12 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
// Do it in the background.
(void)repair_tracker().run(id, [this, &db, id, keyspace = std::move(keyspace),
cfs = std::move(cfs), ranges = std::move(ranges), options = std::move(options), ignore_nodes = std::move(ignore_nodes)] () mutable {
- auto uuid = id.uuid;
-
- auto waiting_nodes = db.local().get_token_metadata().get_all_endpoints();
- std::erase_if(waiting_nodes, [&] (const auto& addr) {
- return ignore_nodes.contains(addr);
- });
auto participants = get_hosts_participating_in_repair(db.local(), keyspace, ranges, options.data_centers, options.hosts, ignore_nodes).get();
- auto hints_timeout = std::chrono::seconds(300);
- auto batchlog_timeout = std::chrono::seconds(300);
- repair_flush_hints_batchlog_request req{id.uuid, participants, hints_timeout, batchlog_timeout};
-
- bool hints_batchlog_flushed = false;
- try {
- parallel_for_each(waiting_nodes, [this, uuid, &req, &participants] (gms::inet_address node) -> future<> {
-
rlogger.info("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, started",
- uuid, node, participants);
- try {
- auto& ms = get_messaging();
- auto resp = co_await ser::partition_checksum_rpc_verbs::send_repair_flush_hints_batchlog(&ms, netw::msg_addr(node), req);
- } catch (...) {
- rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, failed: {}",
- uuid, node, participants, std::current_exception());
- throw;
- }
- }).get();
- hints_batchlog_flushed = true;
- } catch (...) {
- rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to participants={} failed, continue to run repair",
- uuid, participants);
- }
-
std::vector<future<>> repair_results;
repair_results.reserve(smp::count);
auto table_ids = get_table_ids(db.local(), keyspace, cfs);
abort_source as;
+ auto uuid = id.uuid;
auto off_strategy_updater = seastar::async([this, uuid, &table_ids, &participants, &as] {
auto tables = std::list<utils::UUID>(table_ids.begin(), table_ids.end());
auto req = node_ops_cmd_request(node_ops_cmd::repair_updater, uuid, {}, {}, {}, {}, std::move(tables));
@@ -1190,21 +1155,13 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
rlogger.info("repair[{}]: Finished to shutdown off-strategy compaction updater", uuid);
});
- auto cleanup_repair_range_history = defer([this, uuid] () mutable {
- try {
- this->cleanup_history(uuid).get();
- } catch (...) {
- rlogger.warn("repair[{}]: Failed to cleanup history: {}", uuid, std::current_exception());
- }
- });
-
for (auto shard : boost::irange(unsigned(0), smp::count)) {
- auto f = container().invoke_on(shard, [keyspace, table_ids, id, ranges, hints_batchlog_flushed,
+ auto f = container().invoke_on(shard, [keyspace, table_ids, id, ranges,
data_centers = options.data_centers, hosts = options.hosts, ignore_nodes] (repair_service& local_repair) mutable {
_node_ops_metrics.repair_total_ranges_sum += ranges.size();
auto ri = make_lw_shared<repair_info>(local_repair,
std::move(keyspace), std::move(ranges), std::move(table_ids),
- id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, id.uuid, hints_batchlog_flushed);
+ id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, id.uuid);
return repair_ranges(ri);
});
repair_results.push_back(std::move(f));
@@ -1306,10 +1263,9 @@ future<> repair_service::do_sync_data_using_repair(
auto data_centers = std::vector<sstring>();
auto hosts = std::vector<sstring>();
auto ignore_nodes = std::unordered_set<gms::inet_address>();
- bool hints_batchlog_flushed = false;
auto ri = make_lw_shared<repair_info>(local_repair,
std::move(keyspace), std::move(ranges), std::move(table_ids),
- id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, ops_uuid, hints_batchlog_flushed);
+ id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, ops_uuid);
ri->neighbors = std::move(neighbors);
return repair_ranges(ri);
});
diff --git a/repair/repair.hh b/repair/repair.hh
--- a/repair/repair.hh
+++ b/repair/repair.hh
@@ -168,7 +168,6 @@ public:
class repair_info {
public:
- repair_service& rs;
seastar::sharded<database>& db;
seastar::sharded<netw::messaging_service>& messaging;
sharded<db::system_distributed_keyspace>& sys_dist_ks;
@@ -187,7 +186,6 @@ public:
std::unordered_set<gms::inet_address> ignore_nodes;
streaming::stream_reason reason;
std::unordered_map<dht::token_range, repair_neighbors> neighbors;
- size_t total_rf;
uint64_t nr_ranges_finished = 0;
uint64_t nr_ranges_total;
size_t nr_failed_ranges = 0;
@@ -196,7 +194,6 @@ public:
repair_stats _stats;
std::unordered_set<sstring> dropped_tables;
std::optional<utils::UUID> _ops_uuid;
- bool _hints_batchlog_flushed = false;
public:
repair_info(repair_service& repair,
const sstring& keyspace_,
@@ -207,8 +204,7 @@ public:
const std::vector<sstring>& hosts_,
const std::unordered_set<gms::inet_address>& ingore_nodes_,
streaming::stream_reason reason_,
- std::optional<utils::UUID> ops_uuid,
- bool hints_batchlog_flushed);
+ std::optional<utils::UUID> ops_uuid);
void check_failed_ranges();
void abort();
void check_in_abort();
@@ -223,10 +219,6 @@ public:
return _ops_uuid;
};
- bool hints_batchlog_flushed() const {
- return _hints_batchlog_flushed;
- }
-
future<> repair_range(const dht::token_range& range);
};
@@ -499,29 +491,6 @@ struct node_ops_cmd_response {
}
};
-
-struct repair_update_system_table_request {
- utils::UUID repair_uuid;
- utils::UUID table_uuid;
- sstring keyspace_name;
- sstring table_name;
- dht::token_range range;
- gc_clock::time_point repair_time;
-};
-
-struct repair_update_system_table_response {
-};
-
-struct repair_flush_hints_batchlog_request {
- utils::UUID repair_uuid;
- std::list<gms::inet_address> target_nodes;
- std::chrono::seconds hints_timeout;
- std::chrono::seconds batchlog_timeout;
-};
-
-struct repair_flush_hints_batchlog_response {
-};
-
namespace std {
template<>
diff --git a/repair/row_level.cc b/repair/row_level.cc
--- a/repair/row_level.cc
+++ b/repair/row_level.cc
@@ -52,13 +52,6 @@
#include "service/migration_manager.hh"
#include "streaming/consumer.hh"
#include <seastar/core/coroutine.hh>
-#include <seastar/coroutine/all.hh>
-#include "db/query_context.hh"
-#include "db/system_keyspace.hh"
-#include "service/storage_proxy.hh"
-#include "db/batchlog_manager.hh"
-#include "cql3/untyped_result_set.hh"
-#include "idl/partition_checksum.dist.hh"
extern logging::logger rlogger;
@@ -2273,66 +2266,6 @@ static future<> repair_get_full_row_hashes_with_rpc_stream_handler(
});
}
-future<repair_update_system_table_response> repair_service::repair_update_system_table_handler(gms::inet_address from, repair_update_system_table_request req) {
- rlogger.debug("repair[{}]: Got repair_update_system_table_request from node={}, range={}, repair_time={}", req.repair_uuid, from, req.range, req.repair_time);
- auto& db = this->get_db();
- bool is_valid_range = true;
- if (req.range.start()) {
- if (req.range.start()->is_inclusive()) {
- is_valid_range = false;
- }
- }
- if (req.range.end()) {
- if (!req.range.end()->is_inclusive()) {
- is_valid_range = false;
- }
- }
- if (!is_valid_range) {
- throw std::runtime_error(format("repair[{}]: range {} is not in the format of (start, end]", req.repair_uuid, req.range));
- }
- co_await db.invoke_on_all([&req] (database& local_db) {
- auto& table = local_db.find_column_family(req.table_uuid);
- return ::update_repair_time(table.schema(), req.range, req.repair_time);
- });
- sstring cql = format("INSERT INTO system.{} (table_uuid, repair_time, repair_uuid, keyspace_name, table_name, range_start, range_end) VALUES (?, ?, ?, ?, ?, ?, ?)",
- db::system_keyspace::REPAIR_HISTORY);
- auto range_start = req.range.start() ? req.range.start()->value() : dht::minimum_token();
- auto range_end = req.range.end() ? req.range.end()->value() : dht::maximum_token();
- db_clock::time_point ts = db_clock::from_time_t(gc_clock::to_time_t(req.repair_time));
- co_await db::qctx->execute_cql(cql, req.table_uuid, ts, req.repair_uuid, req.keyspace_name, req.table_name,
- dht::token::to_int64(range_start), dht::token::to_int64(range_end)).discard_result();
- co_return repair_update_system_table_response();
-}
-
-future<repair_flush_hints_batchlog_response> repair_service::repair_flush_hints_batchlog_handler(gms::inet_address from, repair_flush_hints_batchlog_request req) {
-
rlogger.info("repair[{}]: Started to process repair_flush_hints_batchlog_request from node={}, target_nodes={}, hints_timeout={}s, batchlog_timeout={}s",
- req.repair_uuid, from, req.target_nodes, req.hints_timeout.count(), req.batchlog_timeout.count());
- std::vector<gms::inet_address> target_nodes(req.target_nodes.begin(), req.target_nodes.end());
- db::hints::sync_point sync_point = co_await _sp.local().create_hint_sync_point(std::move(target_nodes));
- lowres_clock::time_point deadline = lowres_clock::now() + req.hints_timeout;
- try {
- co_await coroutine::all(
- [this, &from, &req, &sync_point, &deadline] () -> future<> {
-
rlogger.info("repair[{}]: Started to flush hints for repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes);
- co_await _sp.local().wait_for_hint_sync_point(std::move(sync_point), deadline);
-
rlogger.info("repair[{}]: Finished to flush hints for repair_flush_hints_batchlog_request from node={}, target_hosts={}", req.repair_uuid, from, req.target_nodes);
- co_return;
- },
- [this, &from, &req] () -> future<> {
-
rlogger.info("repair[{}]: Started to flush batchlog for repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes);
- co_await _bm.local().do_batch_log_replay();
-
rlogger.info("repair[{}]: Finished to flush batchlog for repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes);
- }
- );
- } catch (...) {
- rlogger.warn("repair[{}]: Failed to process repair_flush_hints_batchlog_request from node={}, target_hosts={}, {}",
- req.repair_uuid, from, req.target_nodes, std::current_exception());
- throw;
- }
-
rlogger.info("repair[{}]: Finished to process repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes);
- co_return repair_flush_hints_batchlog_response();
-}
-
future<> repair_service::init_ms_handlers() {
auto& ms = this->_messaging;
@@ -2497,14 +2430,6 @@ future<> repair_service::init_ms_handlers() {
ms.register_repair_get_diff_algorithms([] (const rpc::client_info& cinfo) {
return make_ready_future<std::vector<row_level_diff_detect_algorithm>>(suportted_diff_detect_algorithms());
});
- ser::partition_checksum_rpc_verbs::register_repair_update_system_table(&ms, [this] (const rpc::client_info& cinfo, repair_update_system_table_request req) {
- auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
- return repair_update_system_table_handler(from, std::move(req));
- });
- ser::partition_checksum_rpc_verbs::register_repair_flush_hints_batchlog(&ms, [this] (const rpc::client_info& cinfo, repair_flush_hints_batchlog_request req) {
- auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
- return repair_flush_hints_batchlog_handler(from, std::move(req));
- });
return make_ready_future<>();
}
@@ -2525,10 +2450,7 @@ future<> repair_service::uninit_ms_handlers() {
ms.unregister_repair_row_level_stop(),
ms.unregister_repair_get_estimated_partitions(),
ms.unregister_repair_set_estimated_partitions(),
- ms.unregister_repair_get_diff_algorithms(),
- ser::partition_checksum_rpc_verbs::unregister_repair_update_system_table(&ms),
- ser::partition_checksum_rpc_verbs::unregister_repair_flush_hints_batchlog(&ms)
- ).discard_result();
+ ms.unregister_repair_get_diff_algorithms()).discard_result();
}
class repair_meta_tracker {
@@ -2600,8 +2522,6 @@ class row_level_repair {
// the next repair.
uint64_t _seed;
- gc_clock::time_point _start_time;
-
public:
row_level_repair(repair_info& ri,
sstring cf_name,
@@ -2614,8 +2534,7 @@ class row_level_repair {
, _range(std::move(range))
, _all_live_peer_nodes(sort_peer_nodes(all_live_peer_nodes))
, _cf(_ri.db.local().find_column_family(_table_id))
- , _seed(get_random_seed())
- , _start_time(gc_clock::now()) {
+ , _seed(get_random_seed()) {
}
private:
@@ -2860,45 +2779,6 @@ class row_level_repair {
master.stats().round_nr_slow_path++;
}
-private:
- // Update system.repair_history table
- future<> update_system_repair_table() {
- // Update repair_history table only if it is a reguar repair.
- if (_ri.reason != streaming::stream_reason::repair) {
- co_return;
- }
- // Update repair_history table only if all replicas have been repaired
- size_t repaired_replicas = _all_live_peer_nodes.size() + 1;
- if (_ri.total_rf != repaired_replicas){
- rlogger.debug("repair[{}]: Skipped to update system.repair_history total_rf={}, repaired_replicas={}, local={}, peers={}",
- _ri.id.uuid, _ri.total_rf, repaired_replicas, utils::fb_utilities::get_broadcast_address(), _all_live_peer_nodes);
- co_return;
- }
- // Update repair_history table only if both hints and batchlog have been flushed.
- if (!_ri.hints_batchlog_flushed()) {
- co_return;
- }
- repair_service& rs = _
ri.rs;
- std::optional<gc_clock::time_point> repair_time_opt = co_await rs.update_history(_ri.id.uuid, _table_id, _range, _start_time);
- if (!repair_time_opt) {
- co_return;
- }
- auto repair_time = repair_time_opt.value();
- repair_update_system_table_request req{_ri.id.uuid, _table_id, _ri.keyspace, _cf_name, _range, repair_time};
- auto all_nodes = _all_live_peer_nodes;
- all_nodes.push_back(utils::fb_utilities::get_broadcast_address());
- co_await parallel_for_each(all_nodes, [this, req] (gms::inet_address node) -> future<> {
- try {
- auto& ms = _ri.messaging.local();
- repair_update_system_table_response resp = co_await ser::partition_checksum_rpc_verbs::send_repair_update_system_table(&ms, netw::messaging_service::msg_addr(node), req);
- rlogger.debug("repair[{}]: Finished to update system.repair_history table of node {}", _ri.id.uuid, node);
- } catch (...) {
- rlogger.warn("repair[{}]: Failed to update system.repair_history table of node {}: {}", _ri.id.uuid, node, std::current_exception());
- }
- });
- co_return;
- }
-
public:
future<> run() {
return seastar::async([this] {
@@ -3022,8 +2902,6 @@ class row_level_repair {
} else {
throw std::runtime_error(format("Failed to repair for keyspace={}, cf={}, range={}", _ri.keyspace, _cf_name, _range));
}
- } else {
- update_system_repair_table().get();
}
rlogger.debug("<<< Finished Row Level Repair (Master): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, range={}, tx_hashes_nr={}, rx_hashes_nr={}, tx_row_nr={}, rx_row_nr={}, row_from_disk_bytes={}, row_from_disk_nr={}",
master.myip(), _all_live_peer_nodes, master.repair_meta_id(), _ri.keyspace, _cf_name, _range, master.stats().tx_hashes_nr, master.stats().rx_hashes_nr, master.stats().tx_row_nr, master.stats().rx_row_nr, master.stats().row_from_disk_bytes, master.stats().row_from_disk_nr);
@@ -3097,17 +2975,13 @@ class row_level_repair_gossip_helper : public gms::i_endpoint_state_change_subsc
repair_service::repair_service(distributed<gms::gossiper>& gossiper,
netw::messaging_service& ms,
sharded<database>& db,
- sharded<service::storage_proxy>& sp,
- sharded<db::batchlog_manager>& bm,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& vug,
service::migration_manager& mm,
size_t max_repair_memory)
: _gossiper(gossiper)
, _messaging(ms)
, _db(db)
- , _sp(sp)
- , _bm(bm)
, _sys_dist_ks(sys_dist_ks)
, _view_update_generator(vug)
, _mm(mm)
@@ -3122,7 +2996,6 @@ repair_service::repair_service(distributed<gms::gossiper>& gossiper,
}
future<> repair_service::start() {
- co_await load_history();
co_await init_metrics();
co_await init_ms_handlers();
}
@@ -3138,79 +3011,3 @@ future<> repair_service::stop() {
repair_service::~repair_service() {
assert(_stopped);
}
-
-static shard_id repair_id_to_shard(utils::UUID& repair_id) {
- return shard_id(repair_id.get_most_significant_bits()) % smp::count;
-}
-
-future<std::optional<gc_clock::time_point>>
-repair_service::update_history(utils::UUID repair_id, utils::UUID table_id, dht::token_range range, gc_clock::time_point repair_time) {
- auto shard = repair_id_to_shard(repair_id);
- return container().invoke_on(shard, [repair_id, table_id, range, repair_time] (repair_service& rs) mutable -> future<std::optional<gc_clock::time_point>> {
- repair_history& rh = rs._finished_ranges_history[repair_id];
- if (rh.repair_time > repair_time) {
- rh.repair_time = repair_time;
- }
- auto finished_shards = ++(rh.finished_ranges[table_id][range]);
- if (finished_shards == smp::count) {
- // All shards have finished repair the range. Send an rpc to ask peers to update system.repair_history table
- rlogger.debug("repair[{}]: Finished range {} for table {} on all shards, updating system.repair_history table, finished_shards={}",
- repair_id, range, table_id, finished_shards);
- co_return rh.repair_time;
- } else {
- rlogger.debug("repair[{}]: Finished range {} for table {} on all shards, updating system.repair_historytable, finished_shards={}",
- repair_id, range, table_id, finished_shards);
- co_return std::nullopt;
- }
- });
-}
-
-future<> repair_service::cleanup_history(utils::UUID repair_id) {
- auto shard = repair_id_to_shard(repair_id);
- return container().invoke_on(shard, [repair_id] (repair_service& rs) mutable {
- rs._finished_ranges_history.erase(repair_id);
- rlogger.debug("repair[{}]: Finished cleaning up repair_service history", repair_id);
- });
-}
-
-future<> repair_service::load_history() {
- auto tables = get_db().local().get_column_families();
- for (const auto& x : tables) {
- auto& table_uuid = x.first;
- auto& table = x.second;
- auto shard = unsigned(table_uuid.get_most_significant_bits()) % smp::count;
- if (shard != this_shard_id()) {
- continue;
- }
-
rlogger.info("Loading repair history for keyspace={}, table={}, table_uuid={}",
- table->schema()->ks_name(), table->schema()->cf_name(), table_uuid);
- auto req = format("SELECT * from system.{} WHERE table_uuid = {}", db::system_keyspace::REPAIR_HISTORY, table_uuid);
- co_await db::qctx->qp().query_internal(req, [this] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
- auto table_uuid = row.get_as<utils::UUID>("table_uuid");
- auto range_start = row.get_as<int64_t>("range_start");
- auto range_end = row.get_as<int64_t>("range_end");
- auto keyspace_name = row.get_as<sstring>("keyspace_name");
- auto table_name = row.get_as<sstring>("table_name");
- auto start = range_start == std::numeric_limits<int64_t>::min() ? dht::minimum_token() : dht::token::from_int64(range_start);
- auto end = range_end == std::numeric_limits<int64_t>::min() ? dht::maximum_token() : dht::token::from_int64(range_end);
- auto repair_time = to_gc_clock(row.get_as<db_clock::time_point>("repair_time"));
- auto range = dht::token_range(dht::token_range::bound(start, false), dht::token_range::bound(end, true));
- rlogger.debug("Loading repair history for keyspace={}, table={}, table_uuid={}, repair_time={}, range={}",
- keyspace_name, table_name, table_uuid, repair_time, range);
- co_await get_db().invoke_on_all([table_uuid, range, repair_time, keyspace_name, table_name] (database& local_db) -> future<> {
- try {
- auto& table = local_db.find_column_family(table_uuid);
- ::update_repair_time(table.schema(), range, repair_time);
- } catch (no_such_column_family&) {
- rlogger.trace("Table {}.{} with {} does not exist", keyspace_name, table_name, table_uuid);
- } catch (...) {
- rlogger.warn("Failed to load repair history for keyspace={}, table={}, range={}, repair_time={}",
- keyspace_name, table_name, range, repair_time);
- }
- co_return;
- });
- co_return stop_iteration::no;
- });
- }
- co_return;
-}
diff --git a/repair/row_level.hh b/repair/row_level.hh
--- a/repair/row_level.hh
+++ b/repair/row_level.hh
@@ -30,39 +30,26 @@ class row_level_repair_gossip_helper;
namespace service {
class migration_manager;
-class storage_proxy;
}
namespace db {
class system_distributed_keyspace;
-class batchlog_manager;
}
namespace gms {
class gossiper;
}
-class repair_history {
-public:
- // The key for the map is the table_id
- std::unordered_map<utils::UUID, std::unordered_map<dht::token_range, size_t>> finished_ranges;
- gc_clock::time_point repair_time = gc_clock::time_point::max();
-};
-
class repair_service : public seastar::peering_sharded_service<repair_service> {
distributed<gms::gossiper>& _gossiper;
netw::messaging_service& _messaging;
sharded<database>& _db;
- sharded<service::storage_proxy>& _sp;
- sharded<db::batchlog_manager>& _bm;
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
sharded<db::view::view_update_generator>& _view_update_generator;
service::migration_manager& _mm;
- std::unordered_map<utils::UUID, repair_history> _finished_ranges_history;
-
shared_ptr<row_level_repair_gossip_helper> _gossip_helper;
std::unique_ptr<tracker> _tracker;
bool _stopped = false;
@@ -79,8 +66,6 @@ public:
repair_service(distributed<gms::gossiper>& gossiper,
netw::messaging_service& ms,
sharded<database>& db,
- sharded<service::storage_proxy>& sp,
- sharded<db::batchlog_manager>& bm,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& vug,
service::migration_manager& mm, size_t max_repair_memory);
@@ -95,10 +80,6 @@ public:
// stop them abruptly).
future<> shutdown();
- future<std::optional<gc_clock::time_point>> update_history(utils::UUID repair_id, utils::UUID table_id, dht::token_range range, gc_clock::time_point repair_time);
- future<> cleanup_history(utils::UUID repair_id);
- future<> load_history();
-
int do_repair_start(sstring keyspace, std::unordered_map<sstring, sstring> options_map);
// The tokens are the tokens assigned to the bootstrap node.
@@ -123,14 +104,6 @@ private:
streaming::stream_reason reason,
std::optional<utils::UUID> ops_uuid);
- future<repair_update_system_table_response> repair_update_system_table_handler(
- gms::inet_address from,
- repair_update_system_table_request req);
-
- future<repair_flush_hints_batchlog_response> repair_flush_hints_batchlog_handler(
- gms::inet_address from,
- repair_flush_hints_batchlog_request req);
-
public:
netw::messaging_service& get_messaging() noexcept { return _messaging; }
sharded<database>& get_db() noexcept { return _db; }
diff --git a/schema.cc b/schema.cc
--- a/schema.cc
+++ b/schema.cc
@@ -40,10 +40,8 @@
#include "dht/i_partitioner.hh"
#include "dht/token-sharding.hh"
#include "cdc/cdc_extension.hh"
-#include "tombstone_gc_extension.hh"
#include "db/paxos_grace_seconds_extension.hh"
#include "utils/rjson.hh"
-#include "tombstone_gc_options.hh"
constexpr int32_t schema::NAME_LENGTH;
@@ -531,7 +529,6 @@ bool operator==(const schema& x, const schema& y)
&& x._raw._compaction_strategy_options == y._raw._compaction_strategy_options
&& x._raw._compaction_enabled == y._raw._compaction_enabled
&& x.cdc_options() == y.cdc_options()
- && x.tombstone_gc_options() == y.tombstone_gc_options()
&& x._raw._caching_options == y._raw._caching_options
&& x._raw._dropped_columns == y._raw._dropped_columns
&& x._raw._collections == y._raw._collections
@@ -1270,26 +1267,11 @@ const cdc::options& schema::cdc_options() const {
return default_cdc_options;
}
-const ::tombstone_gc_options& schema::tombstone_gc_options() const {
- static const ::tombstone_gc_options default_tombstone_gc_options;
- const auto& schema_extensions = _raw._extensions;
-
- if (auto it = schema_extensions.find(tombstone_gc_extension::NAME); it != schema_extensions.end()) {
- return dynamic_pointer_cast<tombstone_gc_extension>(it->second)->get_options();
- }
- return default_tombstone_gc_options;
-}
-
schema_builder& schema_builder::with_cdc_options(const cdc::options& opts) {
add_extension(cdc::cdc_extension::NAME, ::make_shared<cdc::cdc_extension>(opts));
return *this;
}
-schema_builder& schema_builder::with_tombstone_gc_options(const tombstone_gc_options& opts) {
- add_extension(tombstone_gc_extension::NAME, ::make_shared<tombstone_gc_extension>(opts));
- return *this;
-}
-
schema_builder& schema_builder::set_paxos_grace_seconds(int32_t seconds) {
add_extension(db::paxos_grace_seconds_extension::NAME, ::make_shared<db::paxos_grace_seconds_extension>(seconds));
return *this;
diff --git a/schema.hh b/schema.hh
--- a/schema.hh
+++ b/schema.hh
@@ -41,7 +41,6 @@
#include "caching_options.hh"
#include "column_computation.hh"
#include "timestamp.hh"
-#include "tombstone_gc_options.hh"
namespace dht {
@@ -822,8 +821,6 @@ public:
const cdc::options& cdc_options() const;
- const ::tombstone_gc_options& tombstone_gc_options() const;
-
const ::speculative_retry& speculative_retry() const {
return _raw._speculative_retry;
}
diff --git a/schema_builder.hh b/schema_builder.hh
--- a/schema_builder.hh
+++ b/schema_builder.hh
@@ -25,7 +25,6 @@
#include "database_fwd.hh"
#include "cdc/log.hh"
#include "dht/i_partitioner.hh"
-#include "tombstone_gc_options.hh"
struct schema_builder {
public:
@@ -292,7 +291,6 @@ public:
schema_builder& without_indexes();
schema_builder& with_cdc_options(const cdc::options&);
- schema_builder& with_tombstone_gc_options(const tombstone_gc_options& opts);
default_names get_default_names() const {
return default_names(_raw);
diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc
--- a/service/storage_proxy.cc
+++ b/service/storage_proxy.cc
@@ -3145,7 +3145,7 @@ class data_read_resolver : public abstract_read_resolver {
auto mp = mutation_partition(s, m.partition());
auto&& ranges = cmd.slice.row_ranges(s, m.key());
bool always_return_static_content = cmd.slice.options.contains<query::partition_slice::option::always_return_static_content>();
- mp.compact_for_query(s, m.decorated_key(), cmd.timestamp, ranges, always_return_static_content, is_reversed, limit);
+ mp.compact_for_query(s, cmd.timestamp, ranges, always_return_static_content, is_reversed, limit);
return primary_key{m.decorated_key(), get_last_reconciled_row(s, mp, is_reversed)};
}
@@ -3220,7 +3220,7 @@ class data_read_resolver : public abstract_read_resolver {
std::vector<query::clustering_range> ranges;
ranges.emplace_back(is_reversed ? query::clustering_range::make_starting_with(std::move(*shortest_read->clustering))
: query::clustering_range::make_ending_with(std::move(*shortest_read->clustering)));
- it->live_row_count = it->mut.partition().compact_for_query(s, it->mut.decorated_key(), cmd.timestamp, ranges, always_return_static_content,
+ it->live_row_count = it->mut.partition().compact_for_query(s, cmd.timestamp, ranges, always_return_static_content,
is_reversed, query::partition_max_rows);
}
}
diff --git a/sstables/sstables.cc b/sstables/sstables.cc
--- a/sstables/sstables.cc
+++ b/sstables/sstables.cc
@@ -82,7 +82,6 @@
#include "mx/reader.hh"
#include "utils/bit_cast.hh"
#include "utils/cached_file.hh"
-#include "tombstone_gc.hh"
thread_local disk_error_signal_type sstable_read_error;
thread_local disk_error_signal_type sstable_write_error;
@@ -3134,46 +3133,6 @@ std::optional<large_data_stats_entry> sstable::get_large_data_stat(large_data_ty
return std::make_optional<large_data_stats_entry>();
}
-// The gc_before returned by the function can only be used to estimate if the
-// sstable is worth dropping some tombstones. We only return the maximum
-// gc_before for all the partitions that have record in repair history map. It
-// is fine that some of the partitions inside the sstable does not have a
-// record.
-gc_clock::time_point sstable::get_gc_before_for_drop_estimation(const gc_clock::time_point& compaction_time) const {
- auto s = get_schema();
- auto start = get_first_decorated_key().token();
- auto end = get_last_decorated_key().token();
- auto range = dht::token_range(dht::token_range::bound(start, true), dht::token_range::bound(end, true));
- sstlog.trace("sstable={}, ks={}, cf={}, range={}, estimate", get_filename(), s->ks_name(), s->cf_name(), range);
- return ::get_gc_before_for_range(s, range, compaction_time).max_gc_before;
-}
-
-// If the sstable contains any regular live cells, we can not drop the sstable.
-// We do not even bother to query the gc_before. Return
-// gc_clock::time_point::min() as gc_before.
-//
-// If the token range of the sstable contains tokens that do not have a record
-// in the repair history map, we can not drop the sstable, in such case we
-// return gc_clock::time_point::min() as gc_before. Otherwise, return the
-// gc_before from the repair history map.
-gc_clock::time_point sstable::get_gc_before_for_fully_expire(const gc_clock::time_point& compaction_time) const {
- auto deletion_time = get_max_local_deletion_time();
- auto s = get_schema();
- // No need to query gc_before for the sstable if the max_deletion_time is max()
- if (deletion_time == gc_clock::time_point(gc_clock::duration(std::numeric_limits<int>::max()))) {
- sstlog.trace("sstable={}, ks={}, cf={}, get_max_local_deletion_time={}, min_timestamp={}, gc_grace_seconds={}, shortcut",
- get_filename(), s->ks_name(), s->cf_name(), deletion_time, get_stats_metadata().min_timestamp, s->gc_grace_seconds().count());
- return gc_clock::time_point::min();
- }
- auto start = get_first_decorated_key().token();
- auto end = get_last_decorated_key().token();
- auto range = dht::token_range(dht::token_range::bound(start, true), dht::token_range::bound(end, true));
- sstlog.trace("sstable={}, ks={}, cf={}, range={}, get_max_local_deletion_time={}, min_timestamp={}, gc_grace_seconds={}, query",
- get_filename(), s->ks_name(), s->cf_name(), range, deletion_time, get_stats_metadata().min_timestamp, s->gc_grace_seconds().count());
- auto res = ::get_gc_before_for_range(s, range, compaction_time);
- return res.knows_entire_range ? res.min_gc_before : gc_clock::time_point::min();
-}
-
}
namespace seastar {
diff --git a/sstables/sstables.hh b/sstables/sstables.hh
--- a/sstables/sstables.hh
+++ b/sstables/sstables.hh
@@ -896,8 +896,6 @@ public:
friend std::unique_ptr<DataConsumeRowsContext>
data_consume_rows(const schema&, shared_sstable, typename DataConsumeRowsContext::consumer&);
friend void lw_shared_ptr_deleter<sstables::sstable>::dispose(sstable* s);
- gc_clock::time_point get_gc_before_for_drop_estimation(const gc_clock::time_point& compaction_time) const;
- gc_clock::time_point get_gc_before_for_fully_expire(const gc_clock::time_point& compaction_time) const;
};
// When we compact sstables, we have to atomically instantiate the new
diff --git a/table.cc b/table.cc
--- a/table.cc
+++ b/table.cc
@@ -59,6 +59,7 @@ static logging::logger tlogger("table");
static seastar::metrics::label column_family_label("cf");
static seastar::metrics::label keyspace_label("ks");
+
using namespace std::chrono_literals;
flat_mutation_reader_v2
@@ -2407,8 +2408,8 @@ class table::table_state : public compaction::table_state {
const sstables::sstable_set& get_sstable_set() const override {
return _t.get_sstable_set();
}
- std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point query_time) const override {
- return sstables::get_fully_expired_sstables(*this, sstables, query_time);
+ std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables) const override {
+ return sstables::get_fully_expired_sstables(*this, sstables, gc_clock::now() - schema()->gc_grace_seconds());
}
const std::vector<sstables::shared_sstable>& compacted_undeleted_sstables() const noexcept override {
return _t.compacted_undeleted_sstables();
diff --git a/test/boost/counter_test.cc b/test/boost/counter_test.cc
--- a/test/boost/counter_test.cc
+++ b/test/boost/counter_test.cc
@@ -266,7 +266,7 @@ SEASTAR_TEST_CASE(test_counter_mutations) {
m = m1;
m.apply(m4);
- m.partition().compact_for_query(*s, m.decorated_key(), gc_clock::now(), { query::clustering_range::make_singular(ck) },
+ m.partition().compact_for_query(*s, gc_clock::now(), { query::clustering_range::make_singular(ck) },
false, false, query::max_rows);
BOOST_REQUIRE_EQUAL(m.partition().clustered_rows().calculate_size(), 0);
BOOST_REQUIRE(m.partition().static_row().empty());
diff --git a/test/boost/memtable_test.cc b/test/boost/memtable_test.cc
--- a/test/boost/memtable_test.cc
+++ b/test/boost/memtable_test.cc
@@ -158,7 +158,7 @@ SEASTAR_TEST_CASE(test_memtable_flush_reader) {
const auto now = gc_clock::now();
auto compacted_muts = muts;
for (auto& mut : compacted_muts) {
- mut.partition().compact_for_compaction(*mut.schema(), always_gc, mut.decorated_key(), now);
+ mut.partition().compact_for_compaction(*mut.schema(), always_gc, now);
}
testlog.info("Simple read");
diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc
--- a/test/boost/mutation_reader_test.cc
+++ b/test/boost/mutation_reader_test.cc
@@ -1029,7 +1029,7 @@ sstables::shared_sstable create_sstable(sstables::test_env& env, schema_ptr s, s
static mutation compacted(const mutation& m) {
auto result = m;
- result.partition().compact_for_compaction(*result.schema(), always_gc, result.decorated_key(), gc_clock::now());
+ result.partition().compact_for_compaction(*result.schema(), always_gc, gc_clock::now());
return result;
}
@@ -2710,8 +2710,7 @@ SEASTAR_THREAD_TEST_CASE(test_compacting_reader_as_mutation_source) {
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) mutable {
auto source = mt->make_flat_reader(s, std::move(permit), range, slice, pc, std::move(trace_state), streamed_mutation::forwarding::no, fwd_mr);
- auto mr = make_compacting_reader(std::move(source), query_time,
- [] (const dht::decorated_key&) { return api::min_timestamp; });
+ auto mr = make_compacting_reader(std::move(source), query_time, [] (const dht::decorated_key&) { return api::min_timestamp; });
if (single_fragment_buffer) {
mr.set_max_buffer_size(1);
}
@@ -2762,8 +2761,7 @@ SEASTAR_THREAD_TEST_CASE(test_compacting_reader_next_partition) {
}
auto mr = make_compacting_reader(make_flat_mutation_reader_from_fragments(ss.schema(), permit, std::move(mfs)),
- gc_clock::now(),
- [] (const dht::decorated_key&) { return api::min_timestamp; });
+ gc_clock::now(), [] (const dht::decorated_key&) { return api::min_timestamp; });
mr.set_max_buffer_size(buffer_size);
return mr;
diff --git a/test/boost/mutation_test.cc b/test/boost/mutation_test.cc
--- a/test/boost/mutation_test.cc
+++ b/test/boost/mutation_test.cc
@@ -458,7 +458,7 @@ SEASTAR_THREAD_TEST_CASE(test_large_collection_allocation) {
auto res_mut_opt = read_mutation_from_flat_mutation_reader(rd).get0();
BOOST_REQUIRE(res_mut_opt);
- res_mut_opt->partition().compact_for_query(*schema, res_mut_opt->decorated_key(), gc_clock::now(), {query::full_clustering_range}, true, false,
+ res_mut_opt->partition().compact_for_query(*schema, gc_clock::now(), {query::full_clustering_range}, true, false,
std::numeric_limits<uint32_t>::max());
const auto stats_after = memory::stats();
@@ -1246,7 +1246,7 @@ SEASTAR_TEST_CASE(test_mutation_hash) {
static mutation compacted(const mutation& m) {
auto result = m;
- result.partition().compact_for_compaction(*result.schema(), always_gc, result.decorated_key(), gc_clock::now());
+ result.partition().compact_for_compaction(*result.schema(), always_gc, gc_clock::now());
return result;
}
@@ -1639,7 +1639,7 @@ SEASTAR_TEST_CASE(test_tombstone_purge) {
tombstone tomb(api::new_timestamp(), gc_clock::now() - std::chrono::seconds(1));
m.partition().apply(tomb);
BOOST_REQUIRE(!m.partition().empty());
- m.partition().compact_for_compaction(*s, always_gc, m.decorated_key(), gc_clock::now());
+ m.partition().compact_for_compaction(*s, always_gc, gc_clock::now());
// Check that row was covered by tombstone.
BOOST_REQUIRE(m.partition().empty());
// Check that tombstone was purged after compact_for_compaction().
@@ -1745,11 +1745,11 @@ SEASTAR_TEST_CASE(test_trim_rows) {
auto compact_and_expect_empty = [&] (mutation m, std::vector<query::clustering_range> ranges) {
mutation m2 = m;
- m.partition().compact_for_query(*s, m.decorated_key(), now, ranges, false, false, query::max_rows);
+ m.partition().compact_for_query(*s, now, ranges, false, false, query::max_rows);
BOOST_REQUIRE(m.partition().clustered_rows().empty());
std::reverse(ranges.begin(), ranges.end());
- m2.partition().compact_for_query(*s, m2.decorated_key(), now, ranges, false, true, query::max_rows);
+ m2.partition().compact_for_query(*s, now, ranges, false, true, query::max_rows);
BOOST_REQUIRE(m2.partition().clustered_rows().empty());
};
@@ -1831,8 +1831,8 @@ SEASTAR_TEST_CASE(test_mutation_diff_with_random_generator) {
if (s != m2.schema()) {
return;
}
- m1.partition().compact_for_compaction(*s, never_gc, m1.decorated_key(), now);
- m2.partition().compact_for_compaction(*s, never_gc, m2.decorated_key(), now);
+ m1.partition().compact_for_compaction(*s, never_gc, now);
+ m2.partition().compact_for_compaction(*s, never_gc, now);
auto m12 = m1;
m12.apply(m2);
auto m12_with_diff = m1;
@@ -2950,7 +2950,6 @@ void run_compaction_data_stream_split_test(const schema& schema, reader_permit p
auto get_max_purgeable = [] (const dht::decorated_key&) {
return api::max_timestamp;
};
- auto gc_grace_seconds = schema.gc_grace_seconds();
auto consumer = make_stable_flattened_mutations_consumer<compact_for_compaction<survived_compacted_fragments_consumer, purged_compacted_fragments_consumer>>(
schema,
query_time,
diff --git a/test/boost/mutation_writer_test.cc b/test/boost/mutation_writer_test.cc
--- a/test/boost/mutation_writer_test.cc
+++ b/test/boost/mutation_writer_test.cc
@@ -371,12 +371,12 @@ SEASTAR_THREAD_TEST_CASE(test_timestamp_based_splitting_mutation_writer) {
const auto now = gc_clock::now();
for (auto& m : muts) {
- m.partition().compact_for_compaction(*random_schema.schema(), always_gc, m.decorated_key(), now);
+ m.partition().compact_for_compaction(*random_schema.schema(), always_gc, now);
}
std::vector<mutation> combined_mutations;
while (auto m = read_mutation_from_flat_mutation_reader(reader).get0()) {
- m->partition().compact_for_compaction(*random_schema.schema(), always_gc, m->decorated_key(), now);
+ m->partition().compact_for_compaction(*random_schema.schema(), always_gc, now);
combined_mutations.emplace_back(std::move(*m));
}
diff --git a/test/boost/mvcc_test.cc b/test/boost/mvcc_test.cc
--- a/test/boost/mvcc_test.cc
+++ b/test/boost/mvcc_test.cc
@@ -695,8 +695,8 @@ SEASTAR_TEST_CASE(test_snapshot_cursor_is_consistent_with_merging) {
// Drop empty rows
can_gc_fn never_gc = [] (tombstone) { return false; };
- actual.compact_for_compaction(*s, never_gc, m1.decorated_key(), gc_clock::now());
- expected.compact_for_compaction(*s, never_gc, m1.decorated_key(), gc_clock::now());
+ actual.compact_for_compaction(*s, never_gc, gc_clock::now());
+ expected.compact_for_compaction(*s, never_gc, gc_clock::now());
assert_that(s, actual).is_equal_to(expected);
}
diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc
--- a/test/boost/sstable_compaction_test.cc
+++ b/test/boost/sstable_compaction_test.cc
@@ -150,8 +150,8 @@ class table_state_for_test : public table_state {
const sstables::sstable_set& get_sstable_set() const override {
return _t->get_sstable_set();
}
- std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point query_time) const override {
- return sstables::get_fully_expired_sstables(_t->as_table_state(), sstables, query_time);
+ std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables) const override {
+ return sstables::get_fully_expired_sstables(_t->as_table_state(), sstables, gc_clock::now() - schema()->gc_grace_seconds());
}
const std::vector<sstables::shared_sstable>& compacted_undeleted_sstables() const noexcept override {
return _compacted_undeleted;
@@ -1394,7 +1394,7 @@ SEASTAR_TEST_CASE(get_fully_expired_sstables_test) {
auto sst2 = add_sstable_for_overlapping_test(env, cf, /*gen*/2, min_key, key_and_token_pair[2].first, build_stats(t0, t1, std::numeric_limits<int32_t>::max()));
auto sst3 = add_sstable_for_overlapping_test(env, cf, /*gen*/3, min_key, max_key, build_stats(t3, t4, std::numeric_limits<int32_t>::max()));
std::vector<sstables::shared_sstable> compacting = { sst1, sst2 };
- auto expired = get_fully_expired_sstables(cf->as_table_state(), compacting, /*gc before*/gc_clock::from_time_t(15) + cf->schema()->gc_grace_seconds());
+ auto expired = get_fully_expired_sstables(cf->as_table_state(), compacting, /*gc before*/gc_clock::from_time_t(15));
BOOST_REQUIRE(expired.size() == 0);
}
@@ -1406,7 +1406,7 @@ SEASTAR_TEST_CASE(get_fully_expired_sstables_test) {
auto sst2 = add_sstable_for_overlapping_test(env, cf, /*gen*/2, min_key, key_and_token_pair[2].first, build_stats(t2, t3, std::numeric_limits<int32_t>::max()));
auto sst3 = add_sstable_for_overlapping_test(env, cf, /*gen*/3, min_key, max_key, build_stats(t3, t4, std::numeric_limits<int32_t>::max()));
std::vector<sstables::shared_sstable> compacting = { sst1, sst2 };
- auto expired = get_fully_expired_sstables(cf->as_table_state(), compacting, /*gc before*/gc_clock::from_time_t(25) + cf->schema()->gc_grace_seconds());
+ auto expired = get_fully_expired_sstables(cf->as_table_state(), compacting, /*gc before*/gc_clock::from_time_t(25));
BOOST_REQUIRE(expired.size() == 1);
auto expired_sst = *expired.begin();
BOOST_REQUIRE(expired_sst->generation() == 1);
@@ -3370,7 +3370,6 @@ SEASTAR_TEST_CASE(purged_tombstone_consumer_sstable_test) {
auto gc_now = gc_clock::now();
gc_before = gc_now - s->gc_grace_seconds();
- auto gc_grace_seconds = s->gc_grace_seconds();
auto cfc = make_stable_flattened_mutations_consumer<compact_for_compaction<compacting_sstable_writer_test, compacting_sstable_writer_test>>(
*s, gc_now, max_purgeable_func, std::move(cr), std::move(purged_cr));
diff --git a/test/lib/flat_mutation_reader_assertions.hh b/test/lib/flat_mutation_reader_assertions.hh
--- a/test/lib/flat_mutation_reader_assertions.hh
+++ b/test/lib/flat_mutation_reader_assertions.hh
@@ -501,7 +501,7 @@ public:
BOOST_REQUIRE(bool(mo));
memory::scoped_critical_alloc_section dfg;
mutation got = *mo;
- got.partition().compact_for_compaction(*m.schema(), always_gc, got.decorated_key(), query_time);
+ got.partition().compact_for_compaction(*m.schema(), always_gc, query_time);
assert_that(got).is_equal_to(m, ck_ranges);
return *this;
}
@@ -912,7 +912,7 @@ public:
BOOST_REQUIRE(bool(mo));
memory::scoped_critical_alloc_section dfg;
mutation got = *mo;
- got.partition().compact_for_compaction(*m.schema(), always_gc, got.decorated_key(), query_time);
+ got.partition().compact_for_compaction(*m.schema(), always_gc, query_time);
assert_that(got).is_equal_to(m, ck_ranges);
return *this;
}
diff --git a/test/lib/mutation_source_test.cc b/test/lib/mutation_source_test.cc
--- a/test/lib/mutation_source_test.cc
+++ b/test/lib/mutation_source_test.cc
@@ -945,7 +945,7 @@ void test_all_data_is_read_back(tests::reader_concurrency_semaphore_wrapper& sem
for_each_mutation([&semaphore, &populate, query_time] (const mutation& m) mutable {
auto ms = populate(m.schema(), {m}, query_time);
mutation copy(m);
- copy.partition().compact_for_compaction(*copy.schema(), always_gc, copy.decorated_key(), query_time);
+ copy.partition().compact_for_compaction(*copy.schema(), always_gc, query_time);
assert_that(ms.make_reader(m.schema(), semaphore.make_permit())).produces_compacted(copy, query_time);
});
}
@@ -1623,7 +1623,7 @@ void test_reader_conversions(tests::reader_concurrency_semaphore_wrapper& semaph
const auto query_time = gc_clock::now();
mutation m_compacted(m);
- m_compacted.partition().compact_for_compaction(*m_compacted.schema(), always_gc, m_compacted.decorated_key(), query_time);
+ m_compacted.partition().compact_for_compaction(*m_compacted.schema(), always_gc, query_time);
{
auto rd = ms.make_reader_v2(m.schema(), semaphore.make_permit());
diff --git a/tombstone_gc.cc b/tombstone_gc.cc
--- a/tombstone_gc.cc
+++ b/tombstone_gc.cc
@@ -1,195 +0,0 @@
-/*
- * Copyright (C) 2021-present ScyllaDB
- */
-
-/*
- * This file is part of Scylla.
- *
- * Scylla is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * Scylla is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Scylla. If not, see <
http://www.gnu.org/licenses/>.
- */
-
-#include <chrono>
-#include <boost/icl/interval.hpp>
-#include <boost/icl/interval_map.hpp>
-#include "schema.hh"
-#include "dht/i_partitioner.hh"
-#include "gc_clock.hh"
-#include "tombstone_gc.hh"
-#include "locator/token_metadata.hh"
-#include "exceptions/exceptions.hh"
-#include "locator/abstract_replication_strategy.hh"
-#include "database.hh"
-#include "gms/feature_service.hh"
-
-extern logging::logger dblog;
-
-class repair_history_map {
-public:
- boost::icl::interval_map<dht::token, gc_clock::time_point, boost::icl::partial_absorber, std::less, boost::icl::inplace_max> map;
-};
-
-thread_local std::unordered_map<utils::UUID, seastar::lw_shared_ptr<repair_history_map>> repair_history_maps;
-
-static seastar::lw_shared_ptr<repair_history_map> get_or_create_repair_history_map_for_table(const utils::UUID& id) {
- auto it = repair_history_maps.find(id);
- if (it != repair_history_maps.end()) {
- return it->second;
- } else {
- repair_history_maps[id] = seastar::make_lw_shared<repair_history_map>();
- return repair_history_maps[id];
- }
-}
-
-seastar::lw_shared_ptr<repair_history_map> get_repair_history_map_for_table(const utils::UUID& id) {
- auto it = repair_history_maps.find(id);
- if (it != repair_history_maps.end()) {
- return it->second;
- } else {
- return {};
- }
-}
-
-void drop_repair_history_map_for_table(const utils::UUID& id) {
- repair_history_maps.erase(id);
-}
-
-// This is useful for a sstable to query a gc_before for a range. The range is
-// defined by the first and last key in the sstable.
-//
-// The min_gc_before and max_gc_before returned are the min and max gc_before for all the keys in the range.
-//
-// The knows_entire_range is set to true:
-// 1) if the tombstone_gc_mode is not repair, since we have the same value for all the keys in the ranges.
-// 2) if the tombstone_gc_mode is repair, and the range is a sub range of a range in the repair history map.
-get_gc_before_for_range_result get_gc_before_for_range(schema_ptr s, const dht::token_range& range, const gc_clock::time_point& query_time) {
- bool knows_entire_range = true;
- const auto& options = s->tombstone_gc_options();
- switch (options.mode()) {
- case tombstone_gc_mode::timeout: {
- dblog.trace("Get gc_before for ks={}, table={}, range={}, mode=timeout", s->ks_name(), s->cf_name(), range);
- auto gc_before = saturating_subtract(query_time, s->gc_grace_seconds());
- return {gc_before, gc_before, knows_entire_range};
- }
- case tombstone_gc_mode::disabled: {
- dblog.trace("Get gc_before for ks={}, table={}, range={}, mode=disabled", s->ks_name(), s->cf_name(), range);
- return {gc_clock::time_point::min(), gc_clock::time_point::min(), knows_entire_range};
- }
- case tombstone_gc_mode::immediate: {
- dblog.trace("Get gc_before for ks={}, table={}, range={}, mode=immediate", s->ks_name(), s->cf_name(), range);
- return {gc_clock::time_point::max(), gc_clock::time_point::max(), knows_entire_range};
- }
- case tombstone_gc_mode::repair: {
- const std::chrono::seconds& propagation_delay = options.propagation_delay_in_seconds();
- auto min_gc_before = gc_clock::time_point::min();
- auto max_gc_before = gc_clock::time_point::min();
- auto min_repair_timestamp = gc_clock::time_point::min();
- auto max_repair_timestamp = gc_clock::time_point::min();
- int hits = 0;
- knows_entire_range = false;
- auto m = get_repair_history_map_for_table(s->id());
- if (m) {
- auto interval = locator::token_metadata::range_to_interval(range);
- auto min = gc_clock::time_point::max();
- auto max = gc_clock::time_point::min();
- bool contains_all = false;
- for (auto& x : boost::make_iterator_range(m->map.equal_range(interval))) {
- auto r = locator::token_metadata::interval_to_range(x.first);
- min = std::min(x.second, min);
- max = std::max(x.second, max);
- if (++hits == 1 && r.contains(range, dht::tri_compare)) {
- contains_all = true;
- }
- }
- if (hits == 0) {
- min_repair_timestamp = gc_clock::time_point::min();
- max_repair_timestamp = gc_clock::time_point::min();
- } else {
- knows_entire_range = hits == 1 && contains_all;
- min_repair_timestamp = min;
- max_repair_timestamp = max;
- }
- min_gc_before = saturating_subtract(min_repair_timestamp, propagation_delay);
- max_gc_before = saturating_subtract(max_repair_timestamp, propagation_delay);
- };
- dblog.trace("Get gc_before for ks={}, table={}, range={}, mode=repair, min_repair_timestamp={}, max_repair_timestamp={}, propagation_delay={}, min_gc_before={}, max_gc_before={}, hits={}, knows_entire_range={}",
- s->ks_name(), s->cf_name(), range, min_repair_timestamp, max_repair_timestamp, propagation_delay.count(), min_gc_before, max_gc_before, hits, knows_entire_range);
- return {min_gc_before, max_gc_before, knows_entire_range};
- }
- }
-}
-
-gc_clock::time_point get_gc_before_for_key(schema_ptr s, const dht::decorated_key& dk, const gc_clock::time_point& query_time) {
- // if mode = timeout // default option, if user does not specify tombstone_gc options
- // if mode = disabled // never gc tombstone
- // if mode = immediate // can gc tombstone immediately
- // if mode = repair // gc after repair
- const auto& options = s->tombstone_gc_options();
- switch (options.mode()) {
- case tombstone_gc_mode::timeout:
- dblog.trace("Get gc_before for ks={}, table={}, dk={}, mode=timeout", s->ks_name(), s->cf_name(), dk);
- return saturating_subtract(query_time, s->gc_grace_seconds());
- case tombstone_gc_mode::disabled:
- dblog.trace("Get gc_before for ks={}, table={}, dk={}, mode=disabled", s->ks_name(), s->cf_name(), dk);
- return gc_clock::time_point::min();
- case tombstone_gc_mode::immediate:
- dblog.trace("Get gc_before for ks={}, table={}, dk={}, mode=immediate", s->ks_name(), s->cf_name(), dk);
- return gc_clock::time_point::max();
- case tombstone_gc_mode::repair:
- const std::chrono::seconds& propagation_delay = options.propagation_delay_in_seconds();
- auto gc_before = gc_clock::time_point::min();
- auto repair_timestamp = gc_clock::time_point::min();
- auto m = get_repair_history_map_for_table(s->id());
- if (m) {
- const auto it = m->map.find(dk.token());
- if (it == m->map.end()) {
- gc_before = gc_clock::time_point::min();
- } else {
- repair_timestamp = it->second;
- gc_before = saturating_subtract(repair_timestamp, propagation_delay);
- }
- }
- dblog.trace("Get gc_before for ks={}, table={}, dk={}, mode=repair, repair_timestamp={}, propagation_delay={}, gc_before={}",
- s->ks_name(), s->cf_name(), dk, repair_timestamp, propagation_delay.count(), gc_before);
- return gc_before;
- }
-}
-
-void update_repair_time(schema_ptr s, const dht::token_range& range, gc_clock::time_point repair_time) {
- auto m = get_or_create_repair_history_map_for_table(s->id());
- m->map += std::make_pair(locator::token_metadata::range_to_interval(range), repair_time);
-}
-
-static bool needs_repair_before_gc(const database& db, sstring ks_name) {
- // If a table uses local replication strategy or rf one, there is no
- // need to run repair even if tombstone_gc mode = repair.
- auto& ks = db.find_keyspace(ks_name);
- auto& rs = ks.get_replication_strategy();
- auto erm = ks.get_effective_replication_map();
- bool needs_repair = rs.get_type() != locator::replication_strategy_type::local
- && erm->get_replication_factor() != 1;
- return needs_repair;
-}
-
-void validate_tombstone_gc_options(const tombstone_gc_options* options, const database& db, sstring ks_name) {
- if (!options) {
- return;
- }
- if (!db.features().cluster_supports_tombstone_gc_options()) {
- throw exceptions::configuration_exception("tombstone_gc option not supported by the cluster");
- }
-
- if (options->mode() == tombstone_gc_mode::repair && !needs_repair_before_gc(db, ks_name)) {
- throw exceptions::configuration_exception("tombstone_gc option with mode = repair not supported for table with RF one or local replication strategy");
- }
-}
diff --git a/tombstone_gc.hh b/tombstone_gc.hh
--- a/tombstone_gc.hh
+++ b/tombstone_gc.hh
@@ -1,51 +0,0 @@
-/*
- * Copyright (C) 2021-present ScyllaDB
- */
-
-/*
- * This file is part of Scylla.
- *
- * Scylla is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * Scylla is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Scylla. If not, see <
http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#include <seastar/core/shared_ptr.hh>
-#include "gc_clock.hh"
-#include "dht/token.hh"
-#include "schema_fwd.hh"
-
-namespace dht {
-
-class decorated_key;
-
-using token_range = nonwrapping_range<token>;
-
-}
-
-struct get_gc_before_for_range_result {
- gc_clock::time_point min_gc_before;
- gc_clock::time_point max_gc_before;
- bool knows_entire_range;
-};
-
-void drop_repair_history_map_for_table(const utils::UUID& id);
-
-get_gc_before_for_range_result get_gc_before_for_range(schema_ptr s, const dht::token_range& range, const gc_clock::time_point& query_time);
-
-gc_clock::time_point get_gc_before_for_key(schema_ptr s, const dht::decorated_key& dk, const gc_clock::time_point& query_time);
-
-void update_repair_time(schema_ptr s, const dht::token_range& range, gc_clock::time_point repair_time);
-
-void validate_tombstone_gc_options(const tombstone_gc_options* options, const database& db, sstring ks_name);
diff --git a/tombstone_gc_extension.hh b/tombstone_gc_extension.hh
--- a/tombstone_gc_extension.hh
+++ b/tombstone_gc_extension.hh
@@ -1,56 +0,0 @@
-/*
- * Copyright 2021-present ScyllaDB
- */
-/*
- * This file is part of Scylla.
- *
- * Scylla is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * Scylla is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with Scylla. If not, see <
http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#include <map>
-
-#include <seastar/core/sstring.hh>
-
-#include "bytes.hh"
-#include "serializer.hh"
-#include "db/extensions.hh"
-#include "schema.hh"
-#include "serializer_impl.hh"
-#include "tombstone_gc_options.hh"
-
-class tombstone_gc_extension : public schema_extension {
- tombstone_gc_options _tombstone_gc_options;
-public:
- static constexpr auto NAME = "tombstone_gc";
-
- tombstone_gc_extension() = default;
- tombstone_gc_extension(const tombstone_gc_options& opts) : _tombstone_gc_options(opts) {}
- explicit tombstone_gc_extension(std::map<seastar::sstring, seastar::sstring> tags) : _tombstone_gc_options(std::move(tags)) {}
- explicit tombstone_gc_extension(const bytes& b) : _tombstone_gc_options(tombstone_gc_extension::deserialize(b)) {}
- explicit tombstone_gc_extension(const seastar::sstring& s) {
- throw std::logic_error("Cannot create tombstone_gc_extension info from string");
- }
- bytes serialize() const override {
- return ser::serialize_to_buffer<bytes>(_tombstone_gc_options.to_map());
- }
- static std::map<seastar::sstring, seastar::sstring> deserialize(const bytes_view& buffer) {
- return ser::deserialize_from_buffer(buffer, boost::type<std::map<seastar::sstring, seastar::sstring>>());
- }
- const tombstone_gc_options& get_options() const {
- return _tombstone_gc_options;
- }
-};
-
diff --git a/tombstone_gc_options.cc b/tombstone_gc_options.cc
--- a/tombstone_gc_options.cc
+++ b/tombstone_gc_options.cc
@@ -1,90 +0,0 @@
-/*
- * Copyright (C) 2021-present ScyllaDB
- */
-
-/*
- * This file is part of Scylla.
- *
- * Scylla is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * Scylla is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Scylla. If not, see <
http://www.gnu.org/licenses/>.
- */
-
-
-#include "tombstone_gc_options.hh"
-#include "exceptions/exceptions.hh"
-#include <boost/lexical_cast.hpp>
-#include <seastar/core/sstring.hh>
-#include <map>
-#include "utils/rjson.hh"
-
-tombstone_gc_options::tombstone_gc_options(const std::map<seastar::sstring, seastar::sstring>& map) {
- for (const auto& x : map) {
- if (x.first == "mode") {
- if (x.second == "disabled") {
- _mode = tombstone_gc_mode::disabled;
- } else if (x.second == "repair") {
- _mode = tombstone_gc_mode::repair;
- } else if (x.second == "timeout") {
- _mode = tombstone_gc_mode::timeout;
- } else if (x.second == "immediate") {
- _mode = tombstone_gc_mode::immediate;
- } else {
- throw exceptions::configuration_exception(format("Invalid value for tombstone_gc option mode: {}", x.second));
- }
- } else if (x.first == "propagation_delay_in_seconds") {
- try {
- auto seconds = boost::lexical_cast<int64_t>(x.second);
- if (seconds < 0) {
- throw exceptions::configuration_exception(format("Invalid value for tombstone_gc option propagation_delay_in_seconds: {}", x.second));
- }
- _propagation_delay_in_seconds = std::chrono::seconds(seconds);
- } catch (...) {
- throw exceptions::configuration_exception(format("Invalid value for tombstone_gc option propagation_delay_in_seconds: {}", x.second));
- }
- } else {
- throw exceptions::configuration_exception(format("Invalid tombstone_gc option: {}", x.first));
- }
- }
-}
-
-std::map<seastar::sstring, seastar::sstring> tombstone_gc_options::to_map() const {
- std::map<seastar::sstring, seastar::sstring> res = {
- {"mode", format("{}", _mode)},
- {"propagation_delay_in_seconds", format("{}", _propagation_delay_in_seconds.count())},
- };
- return res;
-}
-
-seastar::sstring tombstone_gc_options::to_sstring() const {
- return rjson::print(rjson::from_string_map(to_map()));
-}
-
-bool
-tombstone_gc_options::operator==(const tombstone_gc_options& other) const {
- return _mode == other._mode && _propagation_delay_in_seconds == other._propagation_delay_in_seconds;
-}
-
-bool
-tombstone_gc_options::operator!=(const tombstone_gc_options& other) const {
- return !(*this == other);
-}
-
-std::ostream& operator<<(std::ostream& os, const tombstone_gc_mode& mode) {
- switch (mode) {
- case tombstone_gc_mode::timeout: return os << "timeout";
- case tombstone_gc_mode::disabled: return os << "disabled";
- case tombstone_gc_mode::immediate: return os << "immediate";
- case tombstone_gc_mode::repair: return os << "repair";
- }
- return os << "unknown";
-}
diff --git a/tombstone_gc_options.hh b/tombstone_gc_options.hh
--- a/tombstone_gc_options.hh
+++ b/tombstone_gc_options.hh
@@ -1,47 +0,0 @@
-/*
- * Copyright (C) 2021-present ScyllaDB
- */
-
-/*
- * This file is part of Scylla.
- *
- * Scylla is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * Scylla is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Scylla. If not, see <
http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#include <map>
-#include <chrono>
-#include <seastar/core/sstring.hh>
-
-enum class tombstone_gc_mode : uint8_t { timeout, disabled, immediate, repair };
-
-class tombstone_gc_options {
-private:
- tombstone_gc_mode _mode = tombstone_gc_mode::timeout;
- std::chrono::seconds _propagation_delay_in_seconds = std::chrono::seconds(3600);
-public:
- tombstone_gc_options() = default;
- const tombstone_gc_mode& mode() const { return _mode; }
- explicit tombstone_gc_options(const std::map<seastar::sstring, seastar::sstring>& map);
- const std::chrono::seconds& propagation_delay_in_seconds() const {
- return _propagation_delay_in_seconds;
- }
- std::map<seastar::sstring, seastar::sstring> to_map() const;
- seastar::sstring to_sstring() const;
- bool operator==(const tombstone_gc_options& other) const;
- bool operator!=(const tombstone_gc_options& other) const;
-};
-
-std::ostream& operator<<(std::ostream& os, const tombstone_gc_mode& m);