From: Avi Kivity <
a...@scylladb.com>
Committer: Avi Kivity <
a...@scylladb.com>
Branch: next
Merge "Implement validation compaction" from Botond
"
Currently, when sstables are suspected to be corrupt, one has a few bad
choices on how to verify that they are indeed correct:
* Obtain suspect sstable files and manually inspect them. This is
problematic because it requires a scylla engineer to have direct access
to data, which is not always simple or even possible due to privacy
protection rules.
* Run sstable scrub in abort mode. This is enough to confirm whether
there is any corruption or not, but only in a binary manner. It is not
possible to explore the full scope of the corruption, as the scrub
will abort on the first corruption.
* Run sstable scrub in non-abort mode. Although this allows for
exploring the full scope of the corruption and it even gets rid of it,
it is a very intrusive and potentially destructive process that some
users might not be willing to even risk.
This patchset offers an alternative: validation compaction. This is a
completely non-intrusive compaction that reads all sstables in turn and
validates their contents, logging any discrepancies it can find. It does
not mutate their content, it doesn't even re-writes them. It is akin to
a dry-run mode for sstable scrub. The reason it was not implemented as
such is that the current compaction infrastructure assumes that input
sstables are replaced by output sstables as part of the compaction
process. Lifting this assumption seemed error-prone and risky, so
instead I snatched the unused "Validation" compaction type for this
purpose. This compaction type completely bypasses the regular compaction
infrastructure but only at the low-level. It still integrates fully
into compaction-manager.
Fixes: #7736
Refs:
https://github.com/scylladb/scylla-tools-java/issues/263
Tests: unit(dev)
"
* 'validation-compaction/v5' of
https://github.com/denesb/scylla:
test/boost/sstable_datafile_test: add test for validation compaction
test/boost/sstable_datafile_test: scrub tests: extract corrupt sst writer code into function
api: storage_service: expose validation compaction
sstables/compaction_manager: add perform_sstable_validation()
sstables/compaction_manager: rewrite_sstables(): resolve maintenance group FIXME
sstables/compaction_manager: add maintenance scheduling group
sstables/compaction_manager: drop _scheduling_group field
sstables/compaction_manager: run_custom_job(): replace parameter name with compaction type
sstables/compaction_manager: run_custom_job(): keep job function alive
sstables/compaction_descriptor: compaction_options: add validation compaction type
sstables/compaction: compaction_options::type(): add static assert for size of index_to_type
sstables/compaction: implement validation compaction type
sstables/compaction: extract compaction info creation into static method
sstables/compaction: extract sstable list formatting to a class
sstables/compaction: scrub_compaction: extract reporting code into static methods
position_in_paritition{_view}: add has_key()
mutation_fragment_stream_validator: add schema() accessor
---
diff --git a/api/api-doc/storage_service.json b/api/api-doc/storage_service.json
--- a/api/api-doc/storage_service.json
+++ b/api/api-doc/storage_service.json
@@ -765,6 +765,38 @@
}
]
},
+ {
+ "path":"/storage_service/keyspace_validate/{keyspace}",
+ "operations":[
+ {
+ "method":"POST",
+ "summary":"Trigger a validation of sstables on a single keyspace",
+ "type": "long",
+ "nickname":"validate",
+ "produces":[
+ "application/json"
+ ],
+ "parameters":[
+ {
+ "name":"keyspace",
+ "description":"The keyspace to validate",
+ "required":true,
+ "allowMultiple":false,
+ "type":"string",
+ "paramType":"path"
+ },
+ {
+ "name":"cf",
+ "description":"Comma seperated column family names",
+ "required":false,
+ "allowMultiple":false,
+ "type":"string",
+ "paramType":"query"
+ }
+ ]
+ }
+ ]
+ },
{
"path":"/storage_service/keyspace_scrub/{keyspace}",
"operations":[
diff --git a/api/storage_service.cc b/api/storage_service.cc
--- a/api/storage_service.cc
+++ b/api/storage_service.cc
@@ -541,6 +541,19 @@ void set_storage_service(http_context& ctx, routes& r) {
});
});
+ ss::validate.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<request> req, sstring keyspace, std::vector<sstring> column_families ) {
+ return ctx.db.invoke_on_all([=] (database& db) {
+ return do_for_each(column_families, [=, &db](sstring cfname) {
+ auto& cm = db.get_compaction_manager();
+ auto& cf = db.find_column_family(keyspace, cfname);
+ return cm.perform_sstable_validation(&cf);
+ });
+ }).then([]{
+ return make_ready_future<json::json_return_type>(0);
+ });
+
+ }));
+
ss::upgrade_sstables.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<request> req, sstring keyspace, std::vector<sstring> column_families) {
bool exclude_current_version = req_param<bool>(*req, "exclude_current_version", false);
diff --git a/compaction/compaction.cc b/compaction/compaction.cc
--- a/compaction/compaction.cc
+++ b/compaction/compaction.cc
@@ -48,9 +48,11 @@
#include <boost/range/adaptors.hpp>
#include <boost/range/join.hpp>
#include <boost/algorithm/cxx11/any_of.hpp>
+#include <boost/algorithm/string/join.hpp>
#include <seastar/core/future-util.hh>
#include <seastar/core/scheduling.hh>
+#include <seastar/core/coroutine.hh>
#include <seastar/util/closeable.hh>
#include "sstables/sstables.hh"
@@ -462,6 +464,35 @@ class garbage_collected_sstable_writer {
}
};
+class formatted_sstables_list {
+ bool _include_origin = true;
+ std::vector<sstring> _ssts;
+public:
+ formatted_sstables_list() = default;
+ explicit formatted_sstables_list(const std::vector<shared_sstable>& ssts, bool include_origin) : _include_origin(include_origin) {
+ _ssts.reserve(ssts.size());
+ for (const auto& sst : ssts) {
+ *this += sst;
+ }
+ }
+ formatted_sstables_list& operator+=(const shared_sstable& sst) {
+ if (_include_origin) {
+ _ssts.emplace_back(format("{}:level={:d}:origin={}", sst->get_filename(), sst->get_sstable_level(), sst->get_origin()));
+ } else {
+ _ssts.emplace_back(format("{}:level={:d}", sst->get_filename(), sst->get_sstable_level()));
+ }
+ return *this;
+ }
+ friend std::ostream& operator<<(std::ostream& os, const formatted_sstables_list& lst);
+};
+
+std::ostream& operator<<(std::ostream& os, const formatted_sstables_list& lst) {
+ os << "[";
+ os << boost::algorithm::join(lst._ssts, ",");
+ os << "]";
+ return os;
+}
+
class compaction {
protected:
column_family& _cf;
@@ -475,7 +506,7 @@ class compaction {
lw_shared_ptr<sstable_set> _compacting;
uint64_t _max_sstable_size;
uint32_t _sstable_level;
- lw_shared_ptr<compaction_info> _info = make_lw_shared<compaction_info>();
+ lw_shared_ptr<compaction_info> _info;
uint64_t _estimated_partitions = 0;
std::vector<unsigned long> _ancestors;
db::replay_position _rp;
@@ -491,6 +522,18 @@ class compaction {
// used to incrementally calculate max purgeable timestamp, as we iterate through decorated keys.
std::optional<sstable_set::incremental_selector> _selector;
std::unordered_set<shared_sstable> _compacting_for_max_purgeable_func;
+public:
+ static lw_shared_ptr<compaction_info> create_compaction_info(column_family& cf, compaction_descriptor descriptor) {
+ auto info = make_lw_shared<compaction_info>();
+ info->ks_name = cf.schema()->ks_name();
+ info->cf_name = cf.schema()->cf_name();
+ info->type = descriptor.options.type();
+ info->run_identifier = descriptor.run_identifier;
+ info->cf = &cf;
+ info->compaction_uuid = utils::UUID_gen::get_time_UUID();
+ return info;
+ }
+
protected:
compaction(column_family& cf, compaction_descriptor descriptor)
: _cf(cf)
@@ -500,6 +543,7 @@ class compaction {
, _sstables(std::move(descriptor.sstables))
, _max_sstable_size(descriptor.max_sstable_bytes)
, _sstable_level(descriptor.level)
+ , _info(create_compaction_info(cf, descriptor))
, _gc_sstable_writer_data(*this)
, _replacer(std::move(descriptor.replacer))
, _run_identifier(descriptor.run_identifier)
@@ -508,10 +552,6 @@ class compaction {
, _selector(_sstable_set ? _sstable_set->make_incremental_selector() : std::optional<sstable_set::incremental_selector>{})
, _compacting_for_max_purgeable_func(std::unordered_set<shared_sstable>(_sstables.begin(), _sstables.end()))
{
- _info->type = descriptor.options.type();
- _info->run_identifier = _run_identifier;
- _info->cf = &cf;
- _info->compaction_uuid = utils::UUID_gen::get_time_UUID();
for (auto& sst : _sstables) {
_stats_collector.update(sst->get_encoding_stats_for_compaction());
}
@@ -603,7 +643,7 @@ class compaction {
requires CompactedFragmentsConsumer<GCConsumer>
future<> setup(GCConsumer gc_consumer) {
auto ssts = make_lw_shared<sstables::sstable_set>(make_sstable_set_for_input());
- sstring formatted_msg = "{} [";
+ formatted_sstables_list formatted_msg;
auto fully_expired = get_fully_expired_sstables(_cf, _sstables, gc_clock::now() - _schema->gc_grace_seconds());
min_max_tracker<api::timestamp_type> timestamp_tracker;
@@ -616,7 +656,7 @@ class compaction {
_ancestors.push_back(sst->generation());
_info->start_size += sst->bytes_on_disk();
_info->total_partitions += sst->get_estimated_key_count();
- formatted_msg += format("{}:level={:d}:origin={}, ", sst->get_filename(), sst->get_sstable_level(), sst->get_origin());
+ formatted_msg += sst;
// Do not actually compact a sstable that is fully expired and can be safely
// dropped without ressurrecting old data.
@@ -640,11 +680,8 @@ class compaction {
// compacted sstables anyway (CL should be clean by then).
_rp = std::max(_rp, sst_stats.position);
}
- formatted_msg += "]";
_info->sstables = _sstables.size();
- _info->ks_name = _schema->ks_name();
- _info->cf_name = _schema->cf_name();
- log_info(formatted_msg, report_start_desc());
+ log_info("{} {}", report_start_desc(), formatted_msg);
if (ssts->all()->size() < _sstables.size()) {
log_debug("{} out of {} input sstables are fully expired sstables that will not be actually compacted",
_sstables.size() - ssts->all()->size(), _sstables.size());
@@ -685,20 +722,17 @@ class compaction {
auto ratio = double(_info->end_size) / double(_info->start_size);
auto duration = std::chrono::duration<float>(ended_at - started_at);
// Don't report NaN or negative number.
- sstring new_sstables_msg;
on_end_of_compaction();
- for (auto& newtab : _info->new_sstables) {
- new_sstables_msg += format("{}:level={:d}, ", newtab->get_filename(), newtab->get_sstable_level());
- }
+ formatted_sstables_list new_sstables_msg(_info->new_sstables, false);
// FIXME: there is some missing information in the log message below.
// look at CompactionTask::runMayThrow() in origin for reference.
// - add support to merge summary (message: Partition merge counts were {%s}.).
// - there is no easy way, currently, to know the exact number of total partitions.
// By the time being, using estimated key count.
- log_info("{} {} sstables to [{}]. {} to {} (~{}% of original) in {}ms = {}. ~{} total partitions merged to {}.",
+ log_info("{} {} sstables to {}. {} to {} (~{}% of original) in {}ms = {}. ~{} total partitions merged to {}.",
report_finish_desc(),
_info->sstables, new_sstables_msg, pretty_printed_data_size(_info->start_size), pretty_printed_data_size(_info->end_size), int(ratio * 100),
std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(), pretty_printed_throughput(_info->end_size, duration),
@@ -1156,6 +1190,66 @@ class cleanup_compaction final : public regular_compaction {
};
class scrub_compaction final : public regular_compaction {
+public:
+ static void report_invalid_partition(compaction_type type, mutation_fragment_stream_validator& validator, const dht::decorated_key& new_key,
+ std::string_view action = "") {
+ const auto& schema = validator.schema();
+ const auto& current_key = validator.previous_partition_key();
+ clogger.error("[{} compaction {}.{}] Invalid partition {} ({}), partition is out-of-order compared to previous partition {} ({}){}{}",
+ type,
+ schema.ks_name(),
+ schema.cf_name(),
+ new_key.key().with_schema(schema),
+ new_key,
+ current_key.key().with_schema(schema),
+ current_key,
+ action.empty() ? "" : "; ",
+ action);
+ }
+ static void report_invalid_partition_start(compaction_type type, mutation_fragment_stream_validator& validator, const dht::decorated_key& new_key,
+ std::string_view action = "") {
+ const auto& schema = validator.schema();
+ const auto& current_key = validator.previous_partition_key();
+ clogger.error("[{} compaction {}.{}] Invalid partition start for partition {} ({}), previous partition {} ({}) didn't end with a partition-end fragment{}{}",
+ type,
+ schema.ks_name(),
+ schema.cf_name(),
+ new_key.key().with_schema(schema),
+ new_key,
+ current_key.key().with_schema(schema),
+ current_key,
+ action.empty() ? "" : "; ",
+ action);
+ }
+ static void report_invalid_mutation_fragment(compaction_type type, mutation_fragment_stream_validator& validator, const mutation_fragment& mf,
+ std::string_view action = "") {
+ const auto& schema = validator.schema();
+ const auto& key = validator.previous_partition_key();
+ const auto prev_pos = validator.previous_position();
+ clogger.error("[{} compaction {}.{}] Invalid {} fragment{} ({}) in partition {} ({}),"
+ " fragment is out-of-order compared to previous {} fragment{} ({}){}{}",
+ type,
+ schema.ks_name(),
+ schema.cf_name(),
+ mf.mutation_fragment_kind(),
+ mf.has_key() ? format(" with key {}", mf.key().with_schema(schema)) : "",
+ mf.position(),
+ key.key().with_schema(schema),
+ key,
+ prev_pos.region(),
+ prev_pos.has_key() ? format(" with key {}", prev_pos.key().with_schema(schema)) : "",
+ prev_pos,
+ action.empty() ? "" : "; ",
+ action);
+ }
+ static void report_invalid_end_of_stream(compaction_type type, mutation_fragment_stream_validator& validator, std::string_view action = "") {
+ const auto& schema = validator.schema();
+ const auto& key = validator.previous_partition_key();
+ clogger.error("[{} compaction {}.{}] Invalid end-of-stream, last partition {} ({}) didn't end with a partition-end fragment{}{}",
+ type, schema.ks_name(), schema.cf_name(), key.key().with_schema(schema), key, action.empty() ? "" : "; ", action);
+ }
+
+private:
class reader : public flat_mutation_reader::impl {
using skip = bool_class<class skip_tag>;
private:
@@ -1173,16 +1267,8 @@ class scrub_compaction final : public regular_compaction {
void on_unexpected_partition_start(const mutation_fragment& ps) {
maybe_abort_scrub();
- const auto& new_key = ps.as_partition_start().key();
- const auto& current_key = _validator.previous_partition_key();
- clogger.error("[scrub compaction {}.{}] Unexpected partition-start for partition {} ({}),"
- " rectifying by adding assumed missing partition-end to the current partition {} ({}).",
- _schema->ks_name(),
- _schema->cf_name(),
- new_key.key().with_schema(*_schema),
- new_key,
- current_key.key().with_schema(*_schema),
- current_key);
+ report_invalid_partition_start(compaction_type::Scrub, _validator, ps.as_partition_start().key(),
+ "Rectifying by adding assumed missing partition-end");
auto pe = mutation_fragment(*_schema, _permit, partition_end{});
if (!_validator(pe)) {
@@ -1205,27 +1291,13 @@ class scrub_compaction final : public regular_compaction {
skip on_invalid_partition(const dht::decorated_key& new_key) {
maybe_abort_scrub();
- const auto& current_key = _validator.previous_partition_key();
if (_scrub_mode == compaction_options::scrub::mode::segregate) {
- clogger.error("[scrub compaction {}.{}] Detected out-of-order partition {} ({}) (previous being {} ({}))",
- _schema->ks_name(),
- _schema->cf_name(),
- new_key.key().with_schema(*_schema),
- new_key,
- current_key.key().with_schema(*_schema),
- current_key);
+ report_invalid_partition(compaction_type::Scrub, _validator, new_key, "Detected");
_validator.reset(new_key);
// Let the segregating interposer consumer handle this.
return skip::no;
}
- clogger.error("[scrub compaction {}.{}] Skipping invalid partition {} ({}):"
- " partition has non-monotonic key compared to current one {} ({})",
- _schema->ks_name(),
- _schema->cf_name(),
- new_key.key().with_schema(*_schema),
- new_key,
- current_key.key().with_schema(*_schema),
- current_key);
+ report_invalid_partition(compaction_type::Scrub, _validator, new_key, "Skipping");
_skip_to_next_partition = true;
return skip::yes;
}
@@ -1239,14 +1311,8 @@ class scrub_compaction final : public regular_compaction {
// The only case a partition end is invalid is when it comes after
// another partition end, and we can just drop it in that case.
if (!mf.is_end_of_partition() && _scrub_mode == compaction_options::scrub::mode::segregate) {
- clogger.error("[scrub compaction {}.{}] Injecting partition start/end to segregate out-of-order fragment {} (previous position being {}) in partition {} ({}):",
- _schema->ks_name(),
- _schema->cf_name(),
- mf.position(),
- _validator.previous_position(),
- key.key().with_schema(*_schema),
- key);
-
+ report_invalid_mutation_fragment(compaction_type::Scrub, _validator, mf,
+ "Injecting partition start/end to segregate out-of-order fragment");
push_mutation_fragment(*_schema, _permit, partition_end{});
// We loose the partition tombstone if any, but it will be
@@ -1259,25 +1325,16 @@ class scrub_compaction final : public regular_compaction {
return skip::no;
}
- clogger.error("[scrub compaction {}.{}] Skipping invalid {} fragment {}in partition {} ({}):"
- " fragment has non-monotonic position {} compared to previous position {}.",
- _schema->ks_name(),
- _schema->cf_name(),
- mf.mutation_fragment_kind(),
- mf.has_key() ? format("with key {} ({}) ", mf.key().with_schema(*_schema), mf.key()) : "",
- key.key().with_schema(*_schema),
- key,
- mf.position(),
- _validator.previous_position());
+ report_invalid_mutation_fragment(compaction_type::Scrub, _validator, mf, "Skipping");
+
return skip::yes;
}
void on_invalid_end_of_stream() {
maybe_abort_scrub();
// Handle missing partition_end
push_mutation_fragment(mutation_fragment(*_schema, _permit, partition_end{}));
- clogger.error("[scrub compaction {}.{}] Adding missing partition-end to the end of the stream.",
- _schema->ks_name(), _schema->cf_name());
+ report_invalid_end_of_stream(compaction_type::Scrub, _validator, "Rectifying by adding missing partition-end to the end of the stream");
}
void fill_buffer_from_underlying() {
@@ -1533,11 +1590,13 @@ compaction_type compaction_options::type() const {
static const compaction_type index_to_type[] = {
compaction_type::Compaction,
compaction_type::Cleanup,
+ compaction_type::Validation,
compaction_type::Upgrade,
compaction_type::Scrub,
compaction_type::Reshard,
compaction_type::Reshape,
};
+ static_assert(std::variant_size_v<compaction_options::options_variant> == std::size(index_to_type));
return index_to_type[_options.index()];
}
@@ -1558,6 +1617,9 @@ static std::unique_ptr<compaction> make_compaction(column_family& cf, sstables::
std::unique_ptr<compaction> operator()(compaction_options::cleanup options) {
return std::make_unique<cleanup_compaction>(cf, std::move(descriptor), std::move(options));
}
+ std::unique_ptr<compaction> operator()(compaction_options::validation) {
+ return nullptr; // this compaction doesn't go through the regular path
+ }
std::unique_ptr<compaction> operator()(compaction_options::upgrade options) {
return std::make_unique<cleanup_compaction>(cf, std::move(descriptor), std::move(options));
}
@@ -1569,12 +1631,100 @@ static std::unique_ptr<compaction> make_compaction(column_family& cf, sstables::
return descriptor.options.visit(visitor_factory);
}
+future<bool> validate_compaction_validate_reader(flat_mutation_reader reader, const compaction_info& info) {
+ auto schema = reader.schema();
+
+ bool valid = true;
+ std::exception_ptr ex;
+
+ try {
+ auto validator = mutation_fragment_stream_validator(*schema);
+
+ while (auto mf_opt = co_await reader(db::no_timeout)) {
+ if (info.is_stop_requested()) [[unlikely]] {
+ // Compaction manager will catch this exception and re-schedule the compaction.
+ throw compaction_stop_exception(info.ks_name, info.cf_name, info.stop_requested);
+ }
+
+ const auto& mf = *mf_opt;
+
+ if (mf.is_partition_start()) {
+ const auto& ps = mf.as_partition_start();
+ if (!validator(mf)) {
+ scrub_compaction::report_invalid_partition_start(compaction_type::Validation, validator, ps.key());
+ validator.reset(mf);
+ valid = false;
+ }
+ if (!validator(ps.key())) {
+ scrub_compaction::report_invalid_partition(compaction_type::Validation, validator, ps.key());
+ validator.reset(ps.key());
+ valid = false;
+ }
+ } else {
+ if (!validator(mf)) {
+ scrub_compaction::report_invalid_mutation_fragment(compaction_type::Validation, validator, mf);
+ validator.reset(mf);
+ valid = false;
+ }
+ }
+ }
+ if (!validator.on_end_of_stream()) {
+ scrub_compaction::report_invalid_end_of_stream(compaction_type::Validation, validator);
+ valid = false;
+ }
+ } catch (...) {
+ ex = std::current_exception();
+ }
+
+ co_await reader.close();
+
+ if (ex) {
+ std::rethrow_exception(std::move(ex));
+ }
+
+ co_return valid;
+}
+
+static future<compaction_info> validate_sstables(sstables::compaction_descriptor descriptor, column_family& cf) {
+ auto schema = cf.schema();
+
+ formatted_sstables_list sstables_list_msg;
+ auto sstables = make_lw_shared<sstables::sstable_set>(sstables::make_partitioned_sstable_set(schema, make_lw_shared<sstable_list>(sstable_list{}), false));
+ for (const auto& sst : descriptor.sstables) {
+ sstables_list_msg += sst;
+ sstables->insert(sst);
+ }
+
+ auto info = compaction::create_compaction_info(cf, descriptor);
+ info->sstables = descriptor.sstables.size();
+ cf.get_compaction_manager().register_compaction(info);
+ auto deregister_compaction = defer([&cf, info] {
+ cf.get_compaction_manager().deregister_compaction(info);
+ });
+
+
clogger.info("Validating {}", sstables_list_msg);
+
+ auto permit = cf.compaction_concurrency_semaphore().make_permit(schema.get(), "Validation");
+ auto reader = sstables->make_local_shard_sstable_reader(schema, permit, query::full_partition_range, schema->full_slice(), descriptor.io_priority,
+ tracing::trace_state_ptr(), ::streamed_mutation::forwarding::no, ::mutation_reader::forwarding::no, default_read_monitor_generator());
+
+ const auto valid = co_await validate_compaction_validate_reader(std::move(reader), *info);
+
+
clogger.info("Validated {} - sstable(s) are {}", sstables_list_msg, valid ? "valid" : "invalid");
+
+ co_return *info;
+}
+
future<compaction_info>
compact_sstables(sstables::compaction_descriptor descriptor, column_family& cf) {
if (descriptor.sstables.empty()) {
throw std::runtime_error(format("Called {} compaction with empty set on behalf of {}.{}", compaction_name(descriptor.options.type()),
cf.schema()->ks_name(), cf.schema()->cf_name()));
}
+ if (descriptor.options.type() == compaction_type::Validation) {
+ // Bypass the usual compaction machinery for validation compaction
+ return validate_sstables(std::move(descriptor), cf);
+ }
auto c = make_compaction(cf, std::move(descriptor));
if (c->enable_garbage_collected_sstable_writer()) {
auto gc_writer = c->make_garbage_collected_sstable_writer();
diff --git a/compaction/compaction.hh b/compaction/compaction.hh
--- a/compaction/compaction.hh
+++ b/compaction/compaction.hh
@@ -112,4 +112,7 @@ namespace sstables {
// For tests, can drop after we virtualize sstables.
flat_mutation_reader make_scrubbing_reader(flat_mutation_reader rd, compaction_options::scrub::mode scrub_mode);
+
+ // For tests, can drop after we virtualize sstables.
+ future<bool> validate_compaction_validate_reader(flat_mutation_reader rd, const compaction_info& info);
}
diff --git a/compaction/compaction_descriptor.hh b/compaction/compaction_descriptor.hh
--- a/compaction/compaction_descriptor.hh
+++ b/compaction/compaction_descriptor.hh
@@ -69,6 +69,8 @@ public:
struct cleanup {
std::reference_wrapper<database> db;
};
+ struct validation {
+ };
struct upgrade {
std::reference_wrapper<database> db;
};
@@ -85,7 +87,7 @@ public:
struct reshape {
};
private:
- using options_variant = std::variant<regular, cleanup, upgrade, scrub, reshard, reshape>;
+ using options_variant = std::variant<regular, cleanup, validation, upgrade, scrub, reshard, reshape>;
private:
options_variant _options;
@@ -111,6 +113,10 @@ public:
return compaction_options(cleanup{db});
}
+ static compaction_options make_validation() {
+ return compaction_options(validation{});
+ }
+
static compaction_options make_upgrade(database& db) {
return compaction_options(upgrade{db});
}
diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc
--- a/compaction/compaction_manager.cc
+++ b/compaction/compaction_manager.cc
@@ -27,6 +27,7 @@
#include "database.hh"
#include "service/storage_service.hh"
#include <seastar/core/metrics.hh>
+#include <seastar/core/coroutine.hh>
#include "sstables/exceptions.hh"
#include "locator/abstract_replication_strategy.hh"
#include <cmath>
@@ -262,7 +263,7 @@ future<> compaction_manager::submit_major_compaction(column_family* cf) {
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_compaction_controller.backlog_of_shares(200), _available_memory));
return do_with(std::move(user_initiated), [this, cf, descriptor = std::move(descriptor)] (compaction_backlog_tracker& bt) mutable {
register_backlog_tracker(bt);
- return with_scheduling_group(_scheduling_group, [this, cf, descriptor = std::move(descriptor)] () mutable {
+ return with_scheduling_group(_
compaction_controller.sg(), [this, cf, descriptor = std::move(descriptor)] () mutable {
return cf->compact_sstables(std::move(descriptor));
});
}).then([compacting = std::move(compacting)] {});
@@ -284,18 +285,21 @@ future<> compaction_manager::submit_major_compaction(column_family* cf) {
return task->compaction_done.get_future().then([task] {});
}
-future<> compaction_manager::run_custom_job(column_family* cf, sstring name, noncopyable_function<future<>()> job) {
+future<> compaction_manager::run_custom_job(column_family* cf, sstables::compaction_type type, noncopyable_function<future<>()> job) {
if (_state != state::enabled) {
return make_ready_future<>();
}
auto task = make_lw_shared<compaction_manager::task>();
task->compacting_cf = cf;
+ task->type = type;
_tasks.push_back(task);
- task->compaction_done = with_semaphore(_custom_job_sem, 1, [this, task, cf, job = std::move(job)] () mutable {
+ auto job_ptr = std::make_unique<noncopyable_function<future<>()>>(std::move(job));
+
+ task->compaction_done = with_semaphore(_custom_job_sem, 1, [this, task, cf, &job = *job_ptr] () mutable {
// take read lock for cf, so major compaction and resharding can't proceed in parallel.
- return with_lock(_compaction_locks[cf].for_read(), [this, task, cf, job = std::move(job)] () mutable {
+ return with_lock(_compaction_locks[cf].for_read(), [this, task, cf, &job] () mutable {
_stats.active_tasks++;
if (!can_proceed(task)) {
return make_ready_future<>();
@@ -306,15 +310,15 @@ future<> compaction_manager::run_custom_job(column_family* cf, sstring name, non
// compaction and some of them may not even belong to current shard.
return job();
});
- }).then_wrapped([this, task, name] (future<> f) {
+ }).then_wrapped([this, task, job_ptr = std::move(job_ptr)] (future<> f) {
_stats.active_tasks--;
_tasks.remove(task);
try {
f.get();
} catch (sstables::compaction_stop_exception& e) {
-
cmlog.info("{} was abruptly stopped, reason: {}", name, e.what());
+
cmlog.info("{} was abruptly stopped, reason: {}", task->type, e.what());
} catch (...) {
- cmlog.error("{} failed: {}", name, std::current_exception());
+ cmlog.error("{} failed: {}", task->type, std::current_exception());
throw;
}
});
@@ -330,8 +334,8 @@ future<> compaction_manager::task_stop(lw_shared_ptr<compaction_manager::task> t
});
}
-compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, abort_source& as)
- : _compaction_controller(sg, iop, 250ms, [this, available_memory] () -> float {
+compaction_manager::compaction_manager(compaction_scheduling_group csg, maintenance_scheduling_group msg, size_t available_memory, abort_source& as)
+ : _compaction_controller(csg.cpu,
csg.io, 250ms, [this, available_memory] () -> float {
_last_backlog = backlog();
auto b = _last_backlog / available_memory;
// This means we are using an unimplemented strategy
@@ -344,7 +348,7 @@ compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_
return b;
})
, _backlog_manager(_compaction_controller)
- , _scheduling_group(_
compaction_controller.sg())
+ , _maintenance_sg(msg)
, _available_memory(available_memory)
, _early_abort_subscription(as.subscribe([this] () noexcept {
do_stop();
@@ -353,10 +357,10 @@ compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_
register_metrics();
}
-compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares, abort_source& as)
- : _compaction_controller(sg, iop, shares)
+compaction_manager::compaction_manager(compaction_scheduling_group csg, maintenance_scheduling_group msg, size_t available_memory, uint64_t shares, abort_source& as)
+ : _compaction_controller(csg.cpu,
csg.io, shares)
, _backlog_manager(_compaction_controller)
- , _scheduling_group(_
compaction_controller.sg())
+ , _maintenance_sg(msg)
, _available_memory(available_memory)
, _early_abort_subscription(as.subscribe([this] () noexcept {
do_stop();
@@ -368,7 +372,7 @@ compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_
compaction_manager::compaction_manager()
: _compaction_controller(seastar::default_scheduling_group(), default_priority_class(), 1)
, _backlog_manager(_compaction_controller)
- , _scheduling_group(_
compaction_controller.sg())
+ , _maintenance_sg(maintenance_scheduling_group{default_scheduling_group(), default_priority_class()})
, _available_memory(1)
{
// No metric registration because this constructor is supposed to be used only by the testing
@@ -562,7 +566,7 @@ void compaction_manager::submit(column_family* cf) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
return with_lock(_compaction_locks[cf].for_read(), [this, task] () mutable {
- return with_scheduling_group(_scheduling_group, [this, task = std::move(task)] () mutable {
+ return with_scheduling_group(_
compaction_controller.sg(), [this, task = std::move(task)] () mutable {
column_family& cf = *task->compacting_cf;
sstables::compaction_strategy cs = cf.get_compaction_strategy();
sstables::compaction_descriptor descriptor = cs.get_sstables_for_compaction(cf, get_candidates(cf));
@@ -704,8 +708,7 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa
column_family& cf = *task->compacting_cf;
auto sstable_level = sst->get_sstable_level();
auto run_identifier = sst->run_identifier();
- // FIXME: this compaction should run with maintenance priority.
- auto descriptor = sstables::compaction_descriptor({ sst }, cf.get_sstable_set(), service::get_local_compaction_priority(),
+ auto descriptor = sstables::compaction_descriptor({ sst }, cf.get_sstable_set(), _
maintenance_sg.io,
sstable_level, sstables::compaction_descriptor::default_max_sstable_bytes, run_identifier, options);
// Releases reference to cleaned sstable such that respective used disk space can be freed.
@@ -719,7 +722,7 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa
task->compaction_running = true;
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_compaction_controller.backlog_of_shares(200), _available_memory));
return do_with(std::move(user_initiated), [this, &cf, descriptor = std::move(descriptor)] (compaction_backlog_tracker& bt) mutable {
- return with_scheduling_group(_scheduling_group, [this, &cf, descriptor = std::move(descriptor)]() mutable {
+ return with_scheduling_group(_maintenance_sg.cpu, [this, &cf, descriptor = std::move(descriptor)]() mutable {
return cf.run_compaction(std::move(descriptor));
});
});
@@ -750,6 +753,48 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa
return task->compaction_done.get_future().then([task] {});
}
+future<> compaction_manager::perform_sstable_validation(column_family* cf) {
+ return run_custom_job(cf, sstables::compaction_type::Validation, [this, &cf = *cf, sstables = get_candidates(*cf)] () mutable -> future<> {
+ class pending_tasks {
+ compaction_manager::stats& _stats;
+ size_t _n;
+ public:
+ pending_tasks(compaction_manager::stats& stats, size_t n) : _stats(stats), _n(n) { _stats.pending_tasks += _n; }
+ ~pending_tasks() { _stats.pending_tasks -= _n; }
+ void operator--(int) {
+ --_stats.pending_tasks;
+ --_n;
+ }
+ };
+ pending_tasks pending(_stats, sstables.size());
+
+ while (!sstables.empty()) {
+ auto sst = sstables.back();
+ sstables.pop_back();
+
+ try {
+ co_await with_scheduling_group(_maintenance_sg.cpu, [&] () {
+ auto desc = sstables::compaction_descriptor({ sst }, {}, _
maintenance_sg.io, sst->get_sstable_level(),
+ sstables::compaction_descriptor::default_max_sstable_bytes, sst->run_identifier(), sstables::compaction_options::make_validation());
+ return compact_sstables(std::move(desc), cf);
+ });
+ } catch (sstables::compaction_stop_exception&) {
+ throw; // let run_custom_job() handle this
+ } catch (storage_io_error&) {
+ throw; // let run_custom_job() handle this
+ } catch (...) {
+ // We are validating potentially corrupt sstables, errors are
+ // expected, just continue with the other sstables when seeing
+ // one.
+ _stats.errors++;
+ cmlog.error("Validating {} failed due to {}, continuing.", sst->get_filename(), std::current_exception());
+ }
+
+ pending--;
+ }
+ });
+}
+
bool needs_cleanup(const sstables::shared_sstable& sst,
const dht::token_range_vector& sorted_owned_ranges,
schema_ptr s) {
diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh
--- a/compaction/compaction_manager.hh
+++ b/compaction/compaction_manager.hh
@@ -57,6 +57,14 @@ public:
uint64_t active_tasks = 0; // Number of compaction going on.
int64_t errors = 0;
};
+ struct compaction_scheduling_group {
+ seastar::scheduling_group cpu;
+ const ::io_priority_class& io;
+ };
+ struct maintenance_scheduling_group {
+ seastar::scheduling_group cpu;
+ const ::io_priority_class& io;
+ };
private:
struct task {
column_family* compacting_cf = nullptr;
@@ -157,7 +165,7 @@ private:
compaction_controller _compaction_controller;
compaction_backlog_manager _backlog_manager;
- seastar::scheduling_group _scheduling_group;
+ maintenance_scheduling_group _maintenance_sg;
size_t _available_memory;
using get_candidates_func = std::function<std::vector<sstables::shared_sstable>(const column_family&)>;
@@ -167,8 +175,8 @@ private:
future<> stop_ongoing_compactions(sstring reason);
optimized_optional<abort_source::subscription> _early_abort_subscription;
public:
- compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, abort_source& as);
- compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares, abort_source& as);
+ compaction_manager(compaction_scheduling_group csg, maintenance_scheduling_group msg, size_t available_memory, abort_source& as);
+ compaction_manager(compaction_scheduling_group csg, maintenance_scheduling_group msg, size_t available_memory, uint64_t shares, abort_source& as);
compaction_manager();
~compaction_manager();
@@ -212,6 +220,13 @@ public:
// Submit a column family to be scrubbed and wait for its termination.
future<> perform_sstable_scrub(column_family* cf, sstables::compaction_options::scrub::mode scrub_mode);
+ // Submit a column family to be validated and wait for its termination.
+ //
+ // Validation compaction reads each sstable individually, passing the
+ // fragment stream through mutation fragment stream validator, logging any
+ // errors found.
+ future<> perform_sstable_validation(column_family* cf);
+
// Submit a column family for major compaction.
future<> submit_major_compaction(column_family* cf);
@@ -220,8 +235,10 @@ public:
// it completes when future returned by job is ready or returns immediately
// if manager was asked to stop.
//
+ // parameter type is the compaction type the operation can most closely be
+ // associated with, use compaction_type::Compaction, if none apply.
// parameter job is a function that will carry the operation
- future<> run_custom_job(column_family* cf, sstring name, noncopyable_function<future<>()> job);
+ future<> run_custom_job(column_family* cf, sstables::compaction_type type, noncopyable_function<future<>()> job);
// Remove a column family from the compaction manager.
// Cancel requests on cf and wait for a possible ongoing compaction on cf.
diff --git a/database.cc b/database.cc
--- a/database.cc
+++ b/database.cc
@@ -91,9 +91,18 @@ inline
std::unique_ptr<compaction_manager>
make_compaction_manager(const db::config& cfg, database_config& dbcfg, abort_source& as) {
if (cfg.compaction_static_shares() > 0) {
- return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory, cfg.compaction_static_shares(), as);
- }
- return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory, as);
+ return std::make_unique<compaction_manager>(
+ compaction_manager::compaction_scheduling_group{dbcfg.compaction_scheduling_group, service::get_local_compaction_priority()},
+ compaction_manager::maintenance_scheduling_group{dbcfg.streaming_scheduling_group, service::get_local_streaming_priority()},
+ dbcfg.available_memory,
+ cfg.compaction_static_shares(),
+ as);
+ }
+ return std::make_unique<compaction_manager>(
+ compaction_manager::compaction_scheduling_group{dbcfg.compaction_scheduling_group, service::get_local_compaction_priority()},
+ compaction_manager::maintenance_scheduling_group{dbcfg.streaming_scheduling_group, service::get_local_streaming_priority()},
+ dbcfg.available_memory,
+ as);
}
lw_shared_ptr<keyspace_metadata>
diff --git a/flat_mutation_reader.cc b/flat_mutation_reader.cc
--- a/flat_mutation_reader.cc
+++ b/flat_mutation_reader.cc
@@ -1025,7 +1025,7 @@ invalid_mutation_fragment_stream::invalid_mutation_fragment_stream(std::runtime_
}
-mutation_fragment_stream_validator::mutation_fragment_stream_validator(const schema& s)
+mutation_fragment_stream_validator::mutation_fragment_stream_validator(const ::schema& s)
: _schema(s)
, _prev_kind(mutation_fragment::kind::partition_end)
, _prev_pos(position_in_partition::end_of_partition_tag_t{})
diff --git a/mutation_fragment_stream_validator.hh b/mutation_fragment_stream_validator.hh
--- a/mutation_fragment_stream_validator.hh
+++ b/mutation_fragment_stream_validator.hh
@@ -44,6 +44,8 @@ class mutation_fragment_stream_validator {
public:
explicit mutation_fragment_stream_validator(const schema& s);
+ const ::schema& schema() const { return _schema; }
+
/// Validate the monotonicity of the fragment kind.
///
/// Should be used when the full, more heavy-weight position-in-partition
diff --git a/position_in_partition.hh b/position_in_partition.hh
--- a/position_in_partition.hh
+++ b/position_in_partition.hh
@@ -187,7 +187,9 @@ public:
return is_partition_end() || (_ck && _ck->is_empty(s) && _bound_weight == bound_weight::after_all_prefixed);
}
- // Valid when >= before_all_clustered_rows()
+ bool has_key() const { return bool(_ck); }
+
+ // Valid when has_key() == true
const clustering_key_prefix& key() const {
return *_ck;
}
@@ -344,6 +346,8 @@ public:
}
}
+ bool has_key() const { return bool(_ck); }
+
const clustering_key_prefix& key() const {
return *_ck;
}
diff --git a/sstables/sstable_directory.cc b/sstables/sstable_directory.cc
--- a/sstables/sstable_directory.cc
+++ b/sstables/sstable_directory.cc
@@ -356,7 +356,7 @@ future<uint64_t> sstable_directory::reshape(compaction_manager& cm, table& table
desc.creator = creator;
- return cm.run_custom_job(&table, "reshape", [this, &table, sstlist = std::move(sstlist), desc = std::move(desc)] () mutable {
+ return cm.run_custom_job(&table, compaction_type::Reshape, [this, &table, sstlist = std::move(sstlist), desc = std::move(desc)] () mutable {
return sstables::compact_sstables(std::move(desc), table).then([this, sstlist = std::move(sstlist)] (sstables::compaction_info result) mutable {
return remove_input_sstables_from_reshaping(std::move(sstlist)).then([this, new_sstables = std::move(result.new_sstables)] () mutable {
return collect_output_sstables_from_reshaping(std::move(new_sstables));
@@ -402,7 +402,7 @@ sstable_directory::reshard(sstable_info_vector shared_info, compaction_manager&
// parallel_for_each so the statistics about pending jobs are updated to reflect all
// jobs. But only one will run in parallel at a time
return parallel_for_each(buckets, [this, iop, &cm, &table, creator = std::move(creator)] (std::vector<sstables::shared_sstable>& sstlist) mutable {
- return cm.run_custom_job(&table, "resharding", [this, iop, &cm, &table, creator, &sstlist] () {
+ return cm.run_custom_job(&table, compaction_type::Reshard, [this, iop, &cm, &table, creator, &sstlist] () {
sstables::compaction_descriptor desc(sstlist, {}, iop);
desc.options = sstables::compaction_options::make_reshard();
desc.creator = std::move(creator);
diff --git a/test/boost/sstable_datafile_test.cc b/test/boost/sstable_datafile_test.cc
--- a/test/boost/sstable_datafile_test.cc
+++ b/test/boost/sstable_datafile_test.cc
@@ -4545,7 +4545,93 @@ SEASTAR_TEST_CASE(sstable_cleanup_correctness_test) {
});
}
-SEASTAR_TEST_CASE(sstable_scrub_skip_mode_test) {
+std::vector<mutation_fragment> write_corrupt_sstable(test_env& env, sstable& sst, reader_permit permit,
+ std::function<void(mutation_fragment&&, bool)> write_to_secondary) {
+ auto schema = sst.get_schema();
+ std::vector<mutation_fragment> corrupt_fragments;
+
+ const auto ts = api::timestamp_type{1};
+
+ auto local_keys = make_local_keys(3, schema);
+
+ auto config = env.manager().configure_writer();
+ config.validation_level = mutation_fragment_stream_validation_level::partition_region; // this test violates key order on purpose
+ auto writer = sst.get_writer(*schema, local_keys.size(), config, encoding_stats{});
+
+ auto make_static_row = [&, schema, ts] {
+ auto r = row{};
+ auto cdef = schema->static_column_at(0);
+ auto ac = atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1)));
+ r.apply(cdef, atomic_cell_or_collection{std::move(ac)});
+ return static_row(*schema, std::move(r));
+ };
+
+ auto make_clustering_row = [&, schema, ts] (unsigned i) {
+ auto r = row{};
+ auto cdef = schema->regular_column_at(0);
+ auto ac = atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1)));
+ r.apply(cdef, atomic_cell_or_collection{std::move(ac)});
+ return clustering_row(clustering_key::from_single_value(*schema, int32_type->decompose(data_value(int(i)))), {}, {}, std::move(r));
+ };
+
+ auto write_partition = [&, schema, ts] (int pk, bool is_corrupt) {
+ auto pkey = partition_key::from_deeply_exploded(*schema, {
local_keys.at(pk) });
+ auto dkey = dht::decorate_key(*schema, pkey);
+
+ testlog.trace("Writing partition {}", pkey.with_schema(*schema));
+
+ write_to_secondary(mutation_fragment(*schema, permit, partition_start(dkey, {})), is_corrupt);
+ corrupt_fragments.emplace_back(*schema, permit, partition_start(dkey, {}));
+ writer.consume_new_partition(dkey);
+
+ {
+ auto sr = make_static_row();
+
+ testlog.trace("Writing row {}", sr.position());
+
+ write_to_secondary(mutation_fragment(*schema, permit, static_row(*schema, sr)), is_corrupt);
+ corrupt_fragments.emplace_back(*schema, permit, static_row(*schema, sr));
+ writer.consume(std::move(sr));
+ }
+
+ const unsigned rows_count = 10;
+ for (unsigned i = 0; i < rows_count; ++i) {
+ auto cr = make_clustering_row(i);
+
+ testlog.trace("Writing row {}", cr.position());
+
+ write_to_secondary(mutation_fragment(*schema, permit, clustering_row(*schema, cr)), is_corrupt);
+ corrupt_fragments.emplace_back(*schema, permit, clustering_row(*schema, cr));
+ writer.consume(clustering_row(*schema, cr));
+
+ // write row twice
+ if (i == (rows_count / 2)) {
+ auto bad_cr = make_clustering_row(i - 2);
+ testlog.trace("Writing out-of-order row {}", bad_cr.position());
+ write_to_secondary(mutation_fragment(*schema, permit, clustering_row(*schema, cr)), true);
+ corrupt_fragments.emplace_back(*schema, permit, clustering_row(*schema, bad_cr));
+ writer.consume(std::move(bad_cr));
+ }
+ }
+
+ testlog.trace("Writing partition_end");
+
+ write_to_secondary(mutation_fragment(*schema, permit, partition_end{}), is_corrupt);
+ corrupt_fragments.emplace_back(*schema, permit, partition_end{});
+ writer.consume_end_of_partition();
+ };
+
+ write_partition(1, false);
+ write_partition(0, true);
+ write_partition(2, false);
+
+
testlog.info("Writing done");
+ writer.consume_end_of_stream();
+
+ return corrupt_fragments;
+}
+
+SEASTAR_TEST_CASE(sstable_validation_test) {
cql_test_config test_cfg;
auto& db_cfg = *test_cfg.db_config;
@@ -4573,98 +4659,218 @@ SEASTAR_TEST_CASE(sstable_scrub_skip_mode_test) {
return env.make_sstable(schema, tmp.path().string(), (*gen)++);
};
- std::vector<mutation_fragment> corrupt_fragments;
- std::vector<mutation_fragment> scrubbed_fragments;
+ auto scrubbed_mt = make_lw_shared<memtable>(schema);
auto sst = sst_gen();
testlog.info("Writing sstable {}", sst->get_filename());
- {
- const auto ts = api::timestamp_type{1};
+ const auto corrupt_fragments = write_corrupt_sstable(env, *sst, permit, [&, mut = std::optional<mutation>()] (mutation_fragment&& mf, bool) mutable {
+ if (mf.is_partition_start()) {
+ mut.emplace(schema, mf.as_partition_start().key());
+ } else if (mf.is_end_of_partition()) {
+ scrubbed_mt->apply(std::move(*mut));
+ mut.reset();
+ } else {
+ mut->apply(std::move(mf));
+ }
+ });
- auto local_keys = make_local_keys(3, schema);
+ sst->load().get();
- auto config = env.manager().configure_writer();
- config.validation_level = mutation_fragment_stream_validation_level::partition_region; // this test violates key order on purpose
- auto writer = sst->get_writer(*schema, local_keys.size(), config, encoding_stats{});
+
testlog.info("Loaded sstable {}", sst->get_filename());
- auto make_static_row = [&, schema, ts] {
- auto r = row{};
- auto cdef = schema->static_column_at(0);
- auto ac = atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1)));
- r.apply(cdef, atomic_cell_or_collection{std::move(ac)});
- return static_row(*schema, std::move(r));
- };
+ auto cfg = column_family_test_config(env.manager(), env.semaphore());
+ cfg.datadir = tmp.path().string();
+ auto table = make_lw_shared<column_family>(schema, cfg, column_family::no_commitlog(),
+ db.get_compaction_manager(), cl_stats, db.row_cache_tracker());
+ auto stop_table = defer([table] {
+ table->stop().get();
+ });
+ table->mark_ready_for_writes();
+ table->start();
- auto make_clustering_row = [&, schema, ts] (unsigned i) {
- auto r = row{};
- auto cdef = schema->regular_column_at(0);
- auto ac = atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1)));
- r.apply(cdef, atomic_cell_or_collection{std::move(ac)});
- return clustering_row(clustering_key::from_single_value(*schema, int32_type->decompose(data_value(int(i)))), {}, {}, std::move(r));
- };
+ table->add_sstable_and_update_cache(sst).get();
- auto write_partition = [&, schema, ts] (int pk, bool write_to_scrubbed) {
- auto pkey = partition_key::from_deeply_exploded(*schema, {
local_keys.at(pk) });
- auto dkey = dht::decorate_key(*schema, pkey);
+ BOOST_REQUIRE(table->in_strategy_sstables().size() == 1);
+ BOOST_REQUIRE(table->in_strategy_sstables().front() == sst);
- testlog.trace("Writing partition {}", pkey.with_schema(*schema));
+ auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector<mutation_fragment>& mfs) {
+ auto r = assert_that(sst->as_mutation_source().make_reader(schema, env.make_reader_permit()));
+ for (const auto& mf : mfs) {
+ testlog.trace("Expecting {}", mutation_fragment::printer(*schema, mf));
+ r.produces(*schema, mf);
+ }
+ r.produces_end_of_stream();
+ };
- if (write_to_scrubbed) {
- scrubbed_fragments.emplace_back(*schema, permit, partition_start(dkey, {}));
- }
- corrupt_fragments.emplace_back(*schema, permit, partition_start(dkey, {}));
- writer.consume_new_partition(dkey);
+
testlog.info("Verifying written data...");
- {
- auto sr = make_static_row();
+ // Make sure we wrote what we though we wrote.
+ verify_fragments(sst, corrupt_fragments);
- testlog.trace("Writing row {}", sr.position());
+
testlog.info("Validate");
- if (write_to_scrubbed) {
- scrubbed_fragments.emplace_back(*schema, permit, static_row(*schema, sr));
- }
- corrupt_fragments.emplace_back(*schema, permit, static_row(*schema, sr));
- writer.consume(std::move(sr));
- }
+ // No way to really test validation besides observing the log messages.
+ compaction_manager.perform_sstable_validation(table.get()).get();
- const unsigned rows_count = 10;
- for (unsigned i = 0; i < rows_count; ++i) {
- auto cr = make_clustering_row(i);
+ BOOST_REQUIRE(table->in_strategy_sstables().size() == 1);
+ BOOST_REQUIRE(table->in_strategy_sstables().front() == sst);
+ verify_fragments(sst, corrupt_fragments);
+ });
+ }, test_cfg);
+}
- testlog.trace("Writing row {}", cr.position());
+SEASTAR_THREAD_TEST_CASE(validation_compaction_validate_reader_test) {
+ auto schema = schema_builder("ks", get_name())
+ .with_column("pk", utf8_type, column_kind::partition_key)
+ .with_column("ck", int32_type, column_kind::clustering_key)
+ .with_column("s", int32_type, column_kind::static_column)
+ .with_column("v", int32_type).build();
+ tests::reader_concurrency_semaphore_wrapper semaphore;
+ auto permit = semaphore.make_permit();
- if (write_to_scrubbed) {
- scrubbed_fragments.emplace_back(*schema, permit, clustering_row(*schema, cr));
- }
- corrupt_fragments.emplace_back(*schema, permit, clustering_row(*schema, cr));
- writer.consume(clustering_row(*schema, cr));
-
- // write row twice
- if (i == (rows_count / 2)) {
- auto bad_cr = make_clustering_row(i - 2);
- testlog.trace("Writing out-of-order row {}", bad_cr.position());
- corrupt_fragments.emplace_back(*schema, permit, clustering_row(*schema, bad_cr));
- writer.consume(std::move(bad_cr));
- }
- }
+ std::deque<mutation_fragment> frags;
- testlog.trace("Writing partition_end");
+ const auto ts = api::timestamp_type{1};
+ auto local_keys = make_local_keys(5, schema);
- if (write_to_scrubbed) {
- scrubbed_fragments.emplace_back(*schema, permit, partition_end{});
- }
- corrupt_fragments.emplace_back(*schema, permit, partition_end{});
- writer.consume_end_of_partition();
- };
+ auto make_partition_start = [&, schema] (unsigned pk) {
+ auto pkey = partition_key::from_deeply_exploded(*schema, {
local_keys.at(pk) });
+ auto dkey = dht::decorate_key(*schema, pkey);
+ return mutation_fragment(*schema, permit, partition_start(std::move(dkey), {}));
+ };
- write_partition(1, true);
- write_partition(0, false);
- write_partition(2, true);
+ auto make_partition_end = [&, schema] {
+ return mutation_fragment(*schema, permit, partition_end());
+ };
+
+ auto make_static_row = [&, schema, ts] {
+ auto r = row{};
+ auto cdef = schema->static_column_at(0);
+ auto ac = atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1)));
+ r.apply(cdef, atomic_cell_or_collection{std::move(ac)});
+ return mutation_fragment(*schema, permit, static_row(*schema, std::move(r)));
+ };
+
+ auto make_clustering_row = [&, schema, ts] (unsigned i) {
+ auto r = row{};
+ auto cdef = schema->regular_column_at(0);
+ auto ac = atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1)));
+ r.apply(cdef, atomic_cell_or_collection{std::move(ac)});
+ return mutation_fragment(*schema, permit,
+ clustering_row(clustering_key::from_single_value(*schema, int32_type->decompose(data_value(int(i)))), {}, {}, std::move(r)));
+ };
+
+ auto info = make_lw_shared<compaction_info>();
+
+ BOOST_TEST_MESSAGE("valid");
+ {
+ frags.emplace_back(make_partition_start(0));
+ frags.emplace_back(make_static_row());
+ frags.emplace_back(make_clustering_row(0));
+ frags.emplace_back(make_clustering_row(1));
+ frags.emplace_back(make_partition_end());
+ frags.emplace_back(make_partition_start(2));
+ frags.emplace_back(make_partition_end());
+
+ const auto valid = validate_compaction_validate_reader(make_flat_mutation_reader_from_fragments(schema, permit, std::move(frags)), *info).get();
+ BOOST_REQUIRE(valid);
+ }
+
+ BOOST_TEST_MESSAGE("out-of-order clustering row");
+ {
+ frags.emplace_back(make_partition_start(0));
+ frags.emplace_back(make_clustering_row(1));
+ frags.emplace_back(make_clustering_row(0));
+ frags.emplace_back(make_partition_end());
+
+ const auto valid = validate_compaction_validate_reader(make_flat_mutation_reader_from_fragments(schema, permit, std::move(frags)), *info).get();
+ BOOST_REQUIRE(!valid);
+ }
+
+ BOOST_TEST_MESSAGE("out-of-order static row");
+ {
+ frags.emplace_back(make_partition_start(0));
+ frags.emplace_back(make_clustering_row(0));
+ frags.emplace_back(make_static_row());
+ frags.emplace_back(make_partition_end());
+
+ const auto valid = validate_compaction_validate_reader(make_flat_mutation_reader_from_fragments(schema, permit, std::move(frags)), *info).get();
+ BOOST_REQUIRE(!valid);
+ }
+
+ BOOST_TEST_MESSAGE("out-of-order partition start");
+ {
+ frags.emplace_back(make_partition_start(0));
+ frags.emplace_back(make_clustering_row(1));
+ frags.emplace_back(make_partition_start(2));
+ frags.emplace_back(make_partition_end());
+
+ const auto valid = validate_compaction_validate_reader(make_flat_mutation_reader_from_fragments(schema, permit, std::move(frags)), *info).get();
+ BOOST_REQUIRE(!valid);
+ }
+
+ BOOST_TEST_MESSAGE("out-of-order partition");
+ {
+ frags.emplace_back(make_partition_start(2));
+ frags.emplace_back(make_clustering_row(0));
+ frags.emplace_back(make_partition_end());
+ frags.emplace_back(make_partition_start(0));
+ frags.emplace_back(make_partition_end());
+
+ const auto valid = validate_compaction_validate_reader(make_flat_mutation_reader_from_fragments(schema, permit, std::move(frags)), *info).get();
+ BOOST_REQUIRE(!valid);
+ }
+
+ BOOST_TEST_MESSAGE("missing end-of-partition at EOS");
+ {
+ frags.emplace_back(make_partition_start(0));
+ frags.emplace_back(make_clustering_row(0));
+
+ const auto valid = validate_compaction_validate_reader(make_flat_mutation_reader_from_fragments(schema, permit, std::move(frags)), *info).get();
+ BOOST_REQUIRE(!valid);
+ }
+}
+
+SEASTAR_TEST_CASE(sstable_scrub_skip_mode_test) {
+ cql_test_config test_cfg;
+
+ auto& db_cfg = *test_cfg.db_config;
+
+ // Disable cache to filter out its possible "corrections" to the corrupt sstable.
+ db_cfg.enable_cache(false);
+ db_cfg.enable_commitlog(false);
+
+ return do_with_cql_env([this] (cql_test_env& cql_env) -> future<> {
+ return test_env::do_with_async([this, &cql_env] (test_env& env) {
+ cell_locker_stats cl_stats;
+
+ auto& db = cql_env.local_db();
+ auto& compaction_manager = db.get_compaction_manager();
+
+ auto schema = schema_builder("ks", get_name())
+ .with_column("pk", utf8_type, column_kind::partition_key)
+ .with_column("ck", int32_type, column_kind::clustering_key)
+ .with_column("s", int32_type, column_kind::static_column)
+ .with_column("v", int32_type).build();
+ auto permit = env.make_reader_permit();
+
+ auto tmp = tmpdir();
+ auto sst_gen = [&env, schema, &tmp, gen = make_lw_shared<unsigned>(1)] () mutable {
+ return env.make_sstable(schema, tmp.path().string(), (*gen)++);
+ };
+
+ std::vector<mutation_fragment> scrubbed_fragments;
+ auto sst = sst_gen();
+
+ const auto corrupt_fragments = write_corrupt_sstable(env, *sst, permit, [&] (mutation_fragment&& mf, bool is_corrupt) {
+ if (!is_corrupt) {
+ scrubbed_fragments.emplace_back(std::move(mf));
+ }
+ });
+
+
testlog.info("Writing sstable {}", sst->get_filename());
-
testlog.info("Writing done");
- writer.consume_end_of_stream();
- }
sst->load().get();
testlog.info("Loaded sstable {}", sst->get_filename());
@@ -4746,91 +4952,22 @@ SEASTAR_TEST_CASE(sstable_scrub_segregate_mode_test) {
return env.make_sstable(schema, tmp.path().string(), (*gen)++);
};
- std::vector<mutation_fragment> corrupt_fragments;
auto scrubbed_mt = make_lw_shared<memtable>(schema);
auto sst = sst_gen();
testlog.info("Writing sstable {}", sst->get_filename());
- {
- const auto ts = api::timestamp_type{1};
-
- auto local_keys = make_local_keys(3, schema);
-
- auto config = env.manager().configure_writer();
- config.validation_level = mutation_fragment_stream_validation_level::partition_region; // this test violates key order on purpose
- auto writer = sst->get_writer(*schema, local_keys.size(), config, encoding_stats{});
-
- auto make_static_row = [&, schema, ts] (mutation& mut) {
- auto r = row{};
- auto cdef = schema->static_column_at(0);
- r.apply(cdef, atomic_cell_or_collection{atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1)))});
- mut.set_static_cell(cdef, atomic_cell_or_collection{atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1)))});
- return static_row(*schema, std::move(r));
- };
-
- auto make_clustering_row = [&, schema, ts] (unsigned i, mutation* mut) {
- auto r = row{};
- auto cdef = schema->regular_column_at(0);
- auto ckey = clustering_key::from_single_value(*schema, int32_type->decompose(data_value(int(i))));
- r.apply(cdef, atomic_cell_or_collection{atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1)))});
- if (mut) {
- mut->set_clustered_cell(ckey, cdef, atomic_cell_or_collection{atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1)))});
- }
- return clustering_row(std::move(ckey), {}, {}, std::move(r));
- };
-
- auto write_partition = [&, schema, ts] (int pk) {
- auto pkey = partition_key::from_deeply_exploded(*schema, {
local_keys.at(pk) });
- auto dkey = dht::decorate_key(*schema, pkey);
-
- testlog.trace("Writing partition {}", pkey);
-
- auto mut = mutation(schema, dkey);
- corrupt_fragments.emplace_back(*schema, permit, partition_start(dkey, {}));
- writer.consume_new_partition(dkey);
-
- {
- auto sr = make_static_row(mut);
-
- testlog.trace("Writing row {}", sr.position());
-
- corrupt_fragments.emplace_back(*schema, permit, static_row(*schema, sr));
- writer.consume(std::move(sr));
- }
-
- const unsigned rows_count = 10;
- for (unsigned i = 0; i < rows_count; ++i) {
- auto cr = make_clustering_row(i, &mut);
-
- testlog.trace("Writing row {}", cr.position());
-
- corrupt_fragments.emplace_back(*schema, permit, clustering_row(*schema, cr));
- writer.consume(clustering_row(*schema, cr));
-
- // write row twice
- if (i == (rows_count / 2)) {
- auto bad_cr = make_clustering_row(i - 2, nullptr);
- testlog.trace("Writing out-of-order row {}", bad_cr.position());
- corrupt_fragments.emplace_back(*schema, permit, clustering_row(*schema, bad_cr));
- writer.consume(std::move(bad_cr));
- }
- }
-
- testlog.trace("Writing partition_end");
-
- corrupt_fragments.emplace_back(*schema, permit, partition_end{});
- writer.consume_end_of_partition();
- scrubbed_mt->apply(mut);
- };
-
- write_partition(1);
- write_partition(0);
- write_partition(2);
+ const auto corrupt_fragments = write_corrupt_sstable(env, *sst, permit, [&, mut = std::optional<mutation>()] (mutation_fragment&& mf, bool) mutable {
+ if (mf.is_partition_start()) {
+ mut.emplace(schema, mf.as_partition_start().key());
+ } else if (mf.is_end_of_partition()) {
+ scrubbed_mt->apply(std::move(*mut));
+ mut.reset();
+ } else {
+ mut->apply(std::move(mf));
+ }
+ });
-
testlog.info("Writing done");
- writer.consume_end_of_stream();
- }
sst->load().get();
testlog.info("Loaded sstable {}", sst->get_filename());