In scrub mode the sstable reader will try hard to not fail on parse
errors. When a parse error happens, it will try to skip to the closest
indexed position in the sstable file. For this it first calls
`index_reader::advance_to_after()` to see if there is such a position in
the current partition. If this fails it skips to the next partition.
The reader obtains a new constructor argument flag which can be used to
turn this on (it defaults to false). This mode will be used by scrub
compaction to enable using it to salvage data from corrupted sstables.
sstables/partition.cc | 98 +++++++++++++++++++++++++++++++++++++++----
1 file changed, 89 insertions(+), 9 deletions(-)
diff --git a/sstables/partition.cc b/sstables/partition.cc
index 727194d71..6e6e4ebd0 100644
--- a/sstables/partition.cc
+++ b/sstables/partition.cc
@@ -122,9 +122,29 @@ void set_range_tombstone_start_from_end_open_marker(Consumer& c, const schema& s
}
}
+static position_in_partition after_position(position_in_partition_view pos) {
+ switch (pos.region()) {
+ case partition_region::partition_start:
+ return position_in_partition::for_static_row();
+ case partition_region::static_row:
+ return position_in_partition::before_all_clustered_rows();
+ case partition_region::clustered:
+ return position_in_partition::after_key(pos);
+ case partition_region::partition_end:
+ return position_in_partition::for_partition_start();
+ }
+}
+
template <typename DataConsumeRowsContext = data_consume_rows_context, typename Consumer = mp_row_consumer_k_l>
requires RowConsumer<Consumer>
class sstable_mutation_reader : public mp_row_consumer_reader {
+public:
+ /// In scrub mode the reader will try hard to read corrupt sstables by
+ /// jumping to the next indexed position when encountering parse errors.
+ /// Position ranges skipped this way will be logged with warning level.
+ using scrub_mode = bool_class<struct scrub_mode_tag>;
+
+private:
Consumer _consumer;
bool _will_likely_slice = false;
bool _read_enabled = true;
@@ -135,13 +155,15 @@ class sstable_mutation_reader : public mp_row_consumer_reader {
std::function<future<> ()> _initialize;
streamed_mutation::forwarding _fwd;
read_monitor& _monitor;
+ scrub_mode _scrub_mode = scrub_mode::no;
public:
sstable_mutation_reader(shared_sstable sst, schema_ptr schema,
reader_permit permit,
const io_priority_class &pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
- read_monitor& mon)
+ read_monitor& mon,
+ scrub_mode scrub = scrub_mode::no)
: mp_row_consumer_reader(std::move(schema), permit, std::move(sst))
, _consumer(this, _schema, std::move(permit), _schema->full_slice(), pc, std::move(trace_state), fwd, _sst)
, _initialize([this] {
@@ -150,7 +172,8 @@ class sstable_mutation_reader : public mp_row_consumer_reader {
return make_ready_future<>();
})
, _fwd(fwd)
- , _monitor(mon) { }
+ , _monitor(mon)
+ , _scrub_mode(scrub) { }
sstable_mutation_reader(shared_sstable sst,
schema_ptr schema,
reader_permit permit,
@@ -160,7 +183,8 @@ class sstable_mutation_reader : public mp_row_consumer_reader {
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
- read_monitor& mon)
+ read_monitor& mon,
+ scrub_mode scrub = scrub_mode::no)
: mp_row_consumer_reader(std::move(schema), permit, std::move(sst))
, _consumer(this, _schema, std::move(permit), slice, pc, std::move(trace_state), fwd, _sst)
, _initialize([this, pr, &pc, &slice, fwd_mr] () mutable {
@@ -178,7 +202,8 @@ class sstable_mutation_reader : public mp_row_consumer_reader {
});
})
, _fwd(fwd)
- , _monitor(mon) { }
+ , _monitor(mon)
+ , _scrub_mode(scrub) { }
sstable_mutation_reader(shared_sstable sst,
schema_ptr schema,
reader_permit permit,
@@ -188,7 +213,8 @@ class sstable_mutation_reader : public mp_row_consumer_reader {
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
- read_monitor& mon)
+ read_monitor& mon,
+ scrub_mode scrub = scrub_mode::no)
: mp_row_consumer_reader(std::move(schema), permit, std::move(sst))
, _consumer(this, _schema, std::move(permit), slice, pc, std::move(trace_state), fwd, _sst)
, _single_partition_read(true)
@@ -215,7 +241,8 @@ class sstable_mutation_reader : public mp_row_consumer_reader {
});
})
, _fwd(fwd)
- , _monitor(mon) { }
+ , _monitor(mon)
+ , _scrub_mode(scrub) { }
// Reference to _consumer is passed to data_consume_rows() in the constructor so we must not allow move/copy
sstable_mutation_reader(sstable_mutation_reader&&) = delete;
@@ -370,6 +397,53 @@ class sstable_mutation_reader : public mp_row_consumer_reader {
}
return _initialize();
}
+ future<position_range> skip_to_next_indexed_clustering_row() {
+ if (!_index_in_current_partition) {
+ return get_index_reader().advance_to(*_current_partition_key).then([this] {
+ _index_in_current_partition = true;
+ return skip_to_next_indexed_clustering_row();
+ });
+ }
+ return _index_reader->advance_to_after(_last_pos.get(), _context->reader_position().position).then([this] (position_in_partition pos) {
+ const auto have_clustering_pos = !pos.is_after_all_clustered_rows(*_schema);
+ position_range range(after_position(_last_pos.get()), pos);
+
+ if (_fwd == streamed_mutation::forwarding::yes) {
+ if (!_consumer.is_in_range(pos) || !have_clustering_pos) {
+ // The returned position is either outside the current
+ // fast-forwarding window or it is outside the current
+ // partition, we report EOS. The next fast_forward_to() call
+ // will take care of the skip.
+ _end_of_stream = true;
+ return make_ready_future<position_range>(std::move(range));
+ }
+ } else if (!_last_pos.get().is_partition_end() && !have_clustering_pos) {
+ // We will have to jump to another partition, so we need to close
+ // the current one if we were in the middle of it.
+ push_mutation_fragment_and_update_last_pos(mutation_fragment(*_schema, _permit, partition_end{}));
+ }
+
+ // If we failed to find a position inside the current partition to
+ // skip to, we declare the current partition finished and let the
+ // reader jump to the next one.
+ if (!have_clustering_pos) {
+ _partition_finished = true;
+ return make_ready_future<position_range>(std::move(range));
+ }
+
+ index_reader& idx = *_index_reader;
+ auto index_position = idx.data_file_positions();
+
+ assert(idx.element_kind() == indexable_element::cell);
+ assert(_context->need_skip(index_position.start));
+
+ return _context->skip_to(idx.element_kind(), index_position.start).then([this, &idx, range = std::move(range)] () mutable {
+ _sst->get_stats().on_partition_seek();
+ set_range_tombstone_start_from_end_open_marker(_consumer, *_schema, idx);
+ return std::move(range);
+ });
+ });
+ }
public:
void on_out_of_clustering_range() override {
if (_fwd == streamed_mutation::forwarding::yes) {
@@ -420,7 +494,7 @@ class sstable_mutation_reader : public mp_row_consumer_reader {
}
});
}
- return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {
+ return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this, timeout] {
if (_partition_finished) {
if (_before_partition) {
return read_partition();
@@ -433,8 +507,14 @@ class sstable_mutation_reader : public mp_row_consumer_reader {
if (is_buffer_full() || _partition_finished || _end_of_stream) {
return make_ready_future<>();
}
- return advance_context(_consumer.maybe_skip()).then([this] {
- return _context->read().handle_exception([this] (std::exception_ptr ex) {
+ return advance_context(_consumer.maybe_skip()).then([this, timeout] {
+ return _context->read().handle_exception([this, timeout] (std::exception_ptr ex) {
+ if (_scrub_mode == scrub_mode::yes) {
+ return skip_to_next_indexed_clustering_row().then([this, timeout, ex = std::move(ex)] (position_range range) mutable {
+ sstlog.warn("reader (scrub mode) {}: skipping over position range {} due to parse error: {}",
+ fmt::ptr(this), range, std::move(ex));
+ });
+ }
if (!_current_partition_key) {
return make_exception_future<>(std::move(ex));
}
--
2.28.0