[RFC PATCH v1 0/5] Scrub compaction: extend to also salvage corrupt data

28 views
Skip to first unread message

Botond Dénes

<bdenes@scylladb.com>
unread,
Nov 24, 2020, 11:02:38 AM11/24/20
to scylladb-dev@googlegroups.com, Botond Dénes
Currently scrub compaction is only able to drop out-of-order and
duplicate rows from the data. This series extends it to be also usable
to salvage parts of a corrupt sstable that fails to parse. This new
feature uses the index to skip over corrupt parts of the sstable. Of
course the effectiveness of this method is limited as it relies on
several assumptions:
* It assumes the index is not corrupt - at least for the parts where the
sstable is corrupt.
* It assumes that the corruption is localized to certain rows or parts
of them.

This seems to be enough to successfully scrub corrupted sstables seen as
part of a recently opened issue #7623, where the clustering key as well
as the contents of certain rows are corrupt, but the rest of the sstable
parses fine.

Fixes: #7658

Also on: https://github.com/denesb/scylla.git
sstable-reader-scrub-mode/v1-rfc

TODO:
* Wire into scrub compaction - this RFC only implements the reader part.
* More testing.

Botond Dénes (5):
sstables: clustered_index_cursor: add skip_info to entry_info
sstables: index_reader: add advance_to_after()
sstables: sstable_reader: track last position seen by the reader
sstables: consumer: add is_in_range() helper
sstables: sstable_reader: add scrub mode

sstables/index_entry.hh | 2 +-
sstables/index_reader.hh | 70 ++++++++++++++
sstables/mp_row_consumer.hh | 50 ++++++++--
sstables/mx/bsearch_clustered_cursor.hh | 17 +++-
sstables/scanning_clustered_index_cursor.hh | 5 +-
sstables/partition.cc | 102 +++++++++++++++++---
6 files changed, 220 insertions(+), 26 deletions(-)

--
2.28.0

Botond Dénes

<bdenes@scylladb.com>
unread,
Nov 24, 2020, 11:02:40 AM11/24/20
to scylladb-dev@googlegroups.com, Botond Dénes
So that `clustered_index_cursor::next_entry()` also returns enough
information to be able to skip to the position it returns in the data
file.

Signed-off-by: Botond Dénes <bde...@scylladb.com>
---
sstables/index_entry.hh | 2 +-
sstables/mx/bsearch_clustered_cursor.hh | 17 ++++++++++++++---
sstables/scanning_clustered_index_cursor.hh | 5 +++--
3 files changed, 18 insertions(+), 6 deletions(-)

diff --git a/sstables/index_entry.hh b/sstables/index_entry.hh
index a7f2a1d8c..2687a5f67 100644
--- a/sstables/index_entry.hh
+++ b/sstables/index_entry.hh
@@ -358,7 +358,7 @@ class clustered_index_cursor {
struct entry_info {
promoted_index_block_position_view start;
promoted_index_block_position_view end;
- offset_in_partition offset;
+ skip_info sinfo;
};

virtual ~clustered_index_cursor() {};
diff --git a/sstables/mx/bsearch_clustered_cursor.hh b/sstables/mx/bsearch_clustered_cursor.hh
index 612597660..06a46d217 100644
--- a/sstables/mx/bsearch_clustered_cursor.hh
+++ b/sstables/mx/bsearch_clustered_cursor.hh
@@ -501,11 +501,22 @@ class bsearch_clustered_cursor : public clustered_index_cursor {
return make_ready_future<std::optional<entry_info>>(std::nullopt);
}
return _promoted_index.get_block(_current_idx, _trace_state)
- .then([this] (promoted_index_block* block) -> std::optional<entry_info> {
+ .then([this] (promoted_index_block* block) -> future<std::optional<entry_info>> {
sstlog.trace("mc_bsearch_clustered_cursor {}: block {}: start={}, end={}, offset={}", fmt::ptr(this), _current_idx,
*block->start, *block->end, block->data_file_offset);
- ++_current_idx;
- return entry_info{*block->start, *block->end, block->data_file_offset};
+ if (_current_idx < 1) {
+ ++_current_idx;
+ return make_ready_future<std::optional<entry_info>>(
+ entry_info{*block->start, *block->end, skip_info{block->data_file_offset, tombstone(), position_in_partition::before_all_clustered_rows()}});
+ }
+ return _promoted_index.get_block(_current_idx - 1, _trace_state).then([this, block] (promoted_index_block* prev_block) -> std::optional<entry_info> {
+ // XXX: Until we have automatic eviction, we need to invalidate cached index blocks
+ // as we walk so that memory footprint is not O(N) but O(log(N)).
+ _promoted_index.invalidate_prior(prev_block, _trace_state);
+ ++_current_idx;
+ auto tomb = tombstone(*prev_block->end_open_marker);
+ return entry_info{*block->start, *block->end, skip_info{block->data_file_offset, tomb, *prev_block->end}};
+ });
});
}

diff --git a/sstables/scanning_clustered_index_cursor.hh b/sstables/scanning_clustered_index_cursor.hh
index dfed40739..3b08cf21d 100644
--- a/sstables/scanning_clustered_index_cursor.hh
+++ b/sstables/scanning_clustered_index_cursor.hh
@@ -209,8 +209,9 @@ class scanning_clustered_index_cursor : public clustered_index_cursor {
}

sstlog.trace("scanning_clustered_index_cursor {}: next_entry(), pi_idx={}", fmt::ptr(this), _current_pi_idx);
- promoted_index_block& block = pi_blocks[_current_pi_idx];
- auto ei = entry_info{block.start(_s), block.end(_s), block.offset()};
+ auto it = pi_blocks.cbegin() + _current_pi_idx;
+
+ auto ei = entry_info{it->start(_s), it->end(_s), get_info_from_promoted_block(it, pi_blocks)};
++_current_pi_idx;
trim_blocks();
return make_ready_future<std::optional<entry_info>>(std::move(ei));
--
2.28.0

Botond Dénes

<bdenes@scylladb.com>
unread,
Nov 24, 2020, 11:02:43 AM11/24/20
to scylladb-dev@googlegroups.com, Botond Dénes
So that in the soon-to-be-added scrub mode, in the case of a parse
error, we have the last position to pass to
`index_reader::advance_to_after()` to calculate the closest position in
the sstable we can skip to.
The position is tracked by views during the buffer-fill process and a
copy of the last position is only copied (materialized) at the end, when
the consumer can start popping mutation fragments. This way we can get
away with one copy per buffer-fill.

Signed-off-by: Botond Dénes <bde...@scylladb.com>
---
sstables/mp_row_consumer.hh | 42 +++++++++++++++++++++++++++++--------
sstables/partition.cc | 4 ++--
2 files changed, 35 insertions(+), 11 deletions(-)

diff --git a/sstables/mp_row_consumer.hh b/sstables/mp_row_consumer.hh
index 3230207f3..ed3f7d944 100644
--- a/sstables/mp_row_consumer.hh
+++ b/sstables/mp_row_consumer.hh
@@ -50,6 +50,24 @@ static inline bytes_view pop_back(std::vector<bytes_view>& vec) {
return b;
}

+class last_position_in_partition {
+ std::optional<position_in_partition> _materialized_pos;
+ position_in_partition_view _pos_view;
+
+public:
+ last_position_in_partition() : _materialized_pos(position_in_partition::end_of_partition_tag_t{}), _pos_view(*_materialized_pos) { }
+
+ void update(position_in_partition_view pos) {
+ _materialized_pos.reset();
+ _pos_view = pos;
+ }
+ void materialize() {
+ _materialized_pos.emplace(_pos_view);
+ _pos_view = *_materialized_pos;
+ }
+ position_in_partition_view get() const { return _pos_view; }
+};
+
class mp_row_consumer_reader : public flat_mutation_reader::impl {
friend class mp_row_consumer_k_l;
friend class mp_row_consumer_m;
@@ -70,6 +88,7 @@ class mp_row_consumer_reader : public flat_mutation_reader::impl {
bool _before_partition = true;

std::optional<dht::decorated_key> _current_partition_key;
+ last_position_in_partition _last_pos;
public:
mp_row_consumer_reader(schema_ptr s, reader_permit permit, shared_sstable sst)
: impl(std::move(s), std::move(permit))
@@ -83,6 +102,11 @@ class mp_row_consumer_reader : public flat_mutation_reader::impl {
virtual void on_out_of_clustering_range() = 0;

void on_next_partition(dht::decorated_key key, tombstone tomb);
+
+ void push_mutation_fragment_and_update_last_pos(mutation_fragment&& mf) {
+ push_mutation_fragment(std::move(mf));
+ _last_pos.update(this->buffer().back().position());
+ }
};

struct new_mutation {
@@ -291,7 +315,7 @@ class mp_row_consumer_k_l : public row_consumer {
}
break;
}
- _reader->push_mutation_fragment(std::move(*mfo));
+ _reader->push_mutation_fragment_and_update_last_pos(std::move(*mfo));
}
return proceed::no;
}
@@ -303,9 +327,9 @@ class mp_row_consumer_k_l : public row_consumer {
while (!_reader->is_buffer_full()) {
auto mfo = _range_tombstones.get_next(*_ready);
if (mfo) {
- _reader->push_mutation_fragment(std::move(*mfo));
+ _reader->push_mutation_fragment_and_update_last_pos(std::move(*mfo));
} else {
- _reader->push_mutation_fragment(std::move(*_ready));
+ _reader->push_mutation_fragment_and_update_last_pos(std::move(*_ready));
_ready = {};
return proceed(!_reader->is_buffer_full());
}
@@ -966,7 +990,7 @@ class mp_row_consumer_m : public consumer_m {
const auto action = _mf_filter->apply(rt);
switch (action) {
case mutation_fragment_filter::result::emit:
- _reader->push_mutation_fragment(mutation_fragment(*_schema, permit(), std::move(rt)));
+ _reader->push_mutation_fragment_and_update_last_pos(mutation_fragment(*_schema, permit(), std::move(rt)));
break;
case mutation_fragment_filter::result::ignore:
if (_mf_filter->out_of_range()) {
@@ -1065,7 +1089,7 @@ class mp_row_consumer_m : public consumer_m {
assert(_mf_filter);
switch (_mf_filter->apply(*mfopt)) {
case mutation_fragment_filter::result::emit:
- _reader->push_mutation_fragment(mutation_fragment(*_schema, permit(), *std::exchange(mfopt, {})));
+ _reader->push_mutation_fragment_and_update_last_pos(mutation_fragment(*_schema, permit(), *std::exchange(mfopt, {})));
break;
case mutation_fragment_filter::result::ignore:
mfopt.reset();
@@ -1386,7 +1410,7 @@ class mp_row_consumer_m : public consumer_m {
auto action = _mf_filter->apply(_in_progress_static_row);
switch (action) {
case mutation_fragment_filter::result::emit:
- _reader->push_mutation_fragment(mutation_fragment(*_schema, permit(), std::move(_in_progress_static_row)));
+ _reader->push_mutation_fragment_and_update_last_pos(mutation_fragment(*_schema, permit(), std::move(_in_progress_static_row)));
break;
case mutation_fragment_filter::result::ignore:
break;
@@ -1399,7 +1423,7 @@ class mp_row_consumer_m : public consumer_m {
if (!_cells.empty()) {
fill_cells(column_kind::regular_column, _in_progress_row->cells());
}
- _reader->push_mutation_fragment(mutation_fragment(*_schema, permit(), *std::exchange(_in_progress_row, {})));
+ _reader->push_mutation_fragment_and_update_last_pos(mutation_fragment(*_schema, permit(), *std::exchange(_in_progress_row, {})));
}

return proceed(!_reader->is_buffer_full());
@@ -1426,7 +1450,7 @@ class mp_row_consumer_m : public consumer_m {
_opened_range_tombstone->tomb};
sstlog.trace("mp_row_consumer_m {}: on_end_of_stream(), emitting last tombstone: {}", fmt::ptr(this), rt);
_opened_range_tombstone.reset();
- _reader->push_mutation_fragment(mutation_fragment(*_schema, permit(), std::move(rt)));
+ _reader->push_mutation_fragment_and_update_last_pos(mutation_fragment(*_schema, permit(), std::move(rt)));
}
}
if (!_reader->_partition_finished) {
@@ -1447,7 +1471,7 @@ class mp_row_consumer_m : public consumer_m {
_reader->_index_in_current_partition = false;
_reader->_partition_finished = true;
_reader->_before_partition = true;
- _reader->push_mutation_fragment(mutation_fragment(*_schema, permit(), partition_end()));
+ _reader->push_mutation_fragment_and_update_last_pos(mutation_fragment(*_schema, permit(), partition_end()));
return proceed::yes;
}

diff --git a/sstables/partition.cc b/sstables/partition.cc
index ade9fdf1c..727194d71 100644
--- a/sstables/partition.cc
+++ b/sstables/partition.cc
@@ -375,7 +375,7 @@ class sstable_mutation_reader : public mp_row_consumer_reader {
if (_fwd == streamed_mutation::forwarding::yes) {
_end_of_stream = true;
} else {
- this->push_mutation_fragment(mutation_fragment(*_schema, _permit, partition_end()));
+ this->push_mutation_fragment_and_update_last_pos(mutation_fragment(*_schema, _permit, partition_end()));
_partition_finished = true;
}
}
@@ -481,7 +481,7 @@ void mp_row_consumer_reader::on_next_partition(dht::decorated_key key, tombstone
_before_partition = false;
_end_of_stream = false;
_current_partition_key = std::move(key);
- push_mutation_fragment(
+ push_mutation_fragment_and_update_last_pos(
mutation_fragment(*_schema, _permit, partition_start(*_current_partition_key, tomb)));
_sst->get_stats().on_partition_read();
}
--
2.28.0

Botond Dénes

<bdenes@scylladb.com>
unread,
Nov 24, 2020, 11:02:43 AM11/24/20
to scylladb-dev@googlegroups.com, Botond Dénes
In scrub mode the sstable reader will try hard to not fail on parse
errors. When a parse error happens, it will try to skip to the closest
indexed position in the sstable file. For this it first calls
`index_reader::advance_to_after()` to see if there is such a position in
the current partition. If this fails it skips to the next partition.
The reader obtains a new constructor argument flag which can be used to
turn this on (it defaults to false). This mode will be used by scrub
compaction to enable using it to salvage data from corrupted sstables.

Signed-off-by: Botond Dénes <bde...@scylladb.com>
---
sstables/partition.cc | 98 +++++++++++++++++++++++++++++++++++++++----
1 file changed, 89 insertions(+), 9 deletions(-)

diff --git a/sstables/partition.cc b/sstables/partition.cc
index 727194d71..6e6e4ebd0 100644
--- a/sstables/partition.cc
+++ b/sstables/partition.cc
@@ -122,9 +122,29 @@ void set_range_tombstone_start_from_end_open_marker(Consumer& c, const schema& s
}
}

+static position_in_partition after_position(position_in_partition_view pos) {
+ switch (pos.region()) {
+ case partition_region::partition_start:
+ return position_in_partition::for_static_row();
+ case partition_region::static_row:
+ return position_in_partition::before_all_clustered_rows();
+ case partition_region::clustered:
+ return position_in_partition::after_key(pos);
+ case partition_region::partition_end:
+ return position_in_partition::for_partition_start();
+ }
+}
+
template <typename DataConsumeRowsContext = data_consume_rows_context, typename Consumer = mp_row_consumer_k_l>
requires RowConsumer<Consumer>
class sstable_mutation_reader : public mp_row_consumer_reader {
+public:
+ /// In scrub mode the reader will try hard to read corrupt sstables by
+ /// jumping to the next indexed position when encountering parse errors.
+ /// Position ranges skipped this way will be logged with warning level.
+ using scrub_mode = bool_class<struct scrub_mode_tag>;
+
+private:
Consumer _consumer;
bool _will_likely_slice = false;
bool _read_enabled = true;
@@ -135,13 +155,15 @@ class sstable_mutation_reader : public mp_row_consumer_reader {
std::function<future<> ()> _initialize;
streamed_mutation::forwarding _fwd;
read_monitor& _monitor;
+ scrub_mode _scrub_mode = scrub_mode::no;
public:
sstable_mutation_reader(shared_sstable sst, schema_ptr schema,
reader_permit permit,
const io_priority_class &pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
- read_monitor& mon)
+ read_monitor& mon,
+ scrub_mode scrub = scrub_mode::no)
: mp_row_consumer_reader(std::move(schema), permit, std::move(sst))
, _consumer(this, _schema, std::move(permit), _schema->full_slice(), pc, std::move(trace_state), fwd, _sst)
, _initialize([this] {
@@ -150,7 +172,8 @@ class sstable_mutation_reader : public mp_row_consumer_reader {
return make_ready_future<>();
})
, _fwd(fwd)
- , _monitor(mon) { }
+ , _monitor(mon)
+ , _scrub_mode(scrub) { }
sstable_mutation_reader(shared_sstable sst,
schema_ptr schema,
reader_permit permit,
@@ -160,7 +183,8 @@ class sstable_mutation_reader : public mp_row_consumer_reader {
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
- read_monitor& mon)
+ read_monitor& mon,
+ scrub_mode scrub = scrub_mode::no)
: mp_row_consumer_reader(std::move(schema), permit, std::move(sst))
, _consumer(this, _schema, std::move(permit), slice, pc, std::move(trace_state), fwd, _sst)
, _initialize([this, pr, &pc, &slice, fwd_mr] () mutable {
@@ -178,7 +202,8 @@ class sstable_mutation_reader : public mp_row_consumer_reader {
});
})
, _fwd(fwd)
- , _monitor(mon) { }
+ , _monitor(mon)
+ , _scrub_mode(scrub) { }
sstable_mutation_reader(shared_sstable sst,
schema_ptr schema,
reader_permit permit,
@@ -188,7 +213,8 @@ class sstable_mutation_reader : public mp_row_consumer_reader {
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
- read_monitor& mon)
+ read_monitor& mon,
+ scrub_mode scrub = scrub_mode::no)
: mp_row_consumer_reader(std::move(schema), permit, std::move(sst))
, _consumer(this, _schema, std::move(permit), slice, pc, std::move(trace_state), fwd, _sst)
, _single_partition_read(true)
@@ -215,7 +241,8 @@ class sstable_mutation_reader : public mp_row_consumer_reader {
});
})
, _fwd(fwd)
- , _monitor(mon) { }
+ , _monitor(mon)
+ , _scrub_mode(scrub) { }

// Reference to _consumer is passed to data_consume_rows() in the constructor so we must not allow move/copy
sstable_mutation_reader(sstable_mutation_reader&&) = delete;
@@ -370,6 +397,53 @@ class sstable_mutation_reader : public mp_row_consumer_reader {
}
return _initialize();
}
+ future<position_range> skip_to_next_indexed_clustering_row() {
+ if (!_index_in_current_partition) {
+ return get_index_reader().advance_to(*_current_partition_key).then([this] {
+ _index_in_current_partition = true;
+ return skip_to_next_indexed_clustering_row();
+ });
+ }
+ return _index_reader->advance_to_after(_last_pos.get(), _context->reader_position().position).then([this] (position_in_partition pos) {
+ const auto have_clustering_pos = !pos.is_after_all_clustered_rows(*_schema);
+ position_range range(after_position(_last_pos.get()), pos);
+
+ if (_fwd == streamed_mutation::forwarding::yes) {
+ if (!_consumer.is_in_range(pos) || !have_clustering_pos) {
+ // The returned position is either outside the current
+ // fast-forwarding window or it is outside the current
+ // partition, we report EOS. The next fast_forward_to() call
+ // will take care of the skip.
+ _end_of_stream = true;
+ return make_ready_future<position_range>(std::move(range));
+ }
+ } else if (!_last_pos.get().is_partition_end() && !have_clustering_pos) {
+ // We will have to jump to another partition, so we need to close
+ // the current one if we were in the middle of it.
+ push_mutation_fragment_and_update_last_pos(mutation_fragment(*_schema, _permit, partition_end{}));
+ }
+
+ // If we failed to find a position inside the current partition to
+ // skip to, we declare the current partition finished and let the
+ // reader jump to the next one.
+ if (!have_clustering_pos) {
+ _partition_finished = true;
+ return make_ready_future<position_range>(std::move(range));
+ }
+
+ index_reader& idx = *_index_reader;
+ auto index_position = idx.data_file_positions();
+
+ assert(idx.element_kind() == indexable_element::cell);
+ assert(_context->need_skip(index_position.start));
+
+ return _context->skip_to(idx.element_kind(), index_position.start).then([this, &idx, range = std::move(range)] () mutable {
+ _sst->get_stats().on_partition_seek();
+ set_range_tombstone_start_from_end_open_marker(_consumer, *_schema, idx);
+ return std::move(range);
+ });
+ });
+ }
public:
void on_out_of_clustering_range() override {
if (_fwd == streamed_mutation::forwarding::yes) {
@@ -420,7 +494,7 @@ class sstable_mutation_reader : public mp_row_consumer_reader {
}
});
}
- return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {
+ return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this, timeout] {
if (_partition_finished) {
if (_before_partition) {
return read_partition();
@@ -433,8 +507,14 @@ class sstable_mutation_reader : public mp_row_consumer_reader {
if (is_buffer_full() || _partition_finished || _end_of_stream) {
return make_ready_future<>();
}
- return advance_context(_consumer.maybe_skip()).then([this] {
- return _context->read().handle_exception([this] (std::exception_ptr ex) {
+ return advance_context(_consumer.maybe_skip()).then([this, timeout] {
+ return _context->read().handle_exception([this, timeout] (std::exception_ptr ex) {
+ if (_scrub_mode == scrub_mode::yes) {
+ return skip_to_next_indexed_clustering_row().then([this, timeout, ex = std::move(ex)] (position_range range) mutable {
+ sstlog.warn("reader (scrub mode) {}: skipping over position range {} due to parse error: {}",
+ fmt::ptr(this), range, std::move(ex));
+ });
+ }
if (!_current_partition_key) {
return make_exception_future<>(std::move(ex));
}
--
2.28.0

Botond Dénes

<bdenes@scylladb.com>
unread,
Nov 24, 2020, 11:02:43 AM11/24/20
to scylladb-dev@googlegroups.com, Botond Dénes
So that the user is able to determine whether a given position is inside
the current fast-forward-to window.

Signed-off-by: Botond Dénes <bde...@scylladb.com>
---
sstables/mp_row_consumer.hh | 8 ++++++++
1 file changed, 8 insertions(+)

diff --git a/sstables/mp_row_consumer.hh b/sstables/mp_row_consumer.hh
index ed3f7d944..51a16afb8 100644
--- a/sstables/mp_row_consumer.hh
+++ b/sstables/mp_row_consumer.hh
@@ -889,6 +889,10 @@ class mp_row_consumer_k_l : public row_consumer {
return start;
}

+ bool is_in_range(position_in_partition_view pos) {
+ return position_in_partition::tri_compare(*_schema)(pos, _fwd_end) <= 0;
+ }
+
bool needs_skip() const {
return (_skip_in_progress || !_in_progress)
&& _last_lower_bound_counter != _ck_ranges_walker->lower_bound_change_counter();
@@ -1143,6 +1147,10 @@ class mp_row_consumer_m : public consumer_m {
return skip;
}

+ bool is_in_range(position_in_partition_view pos) {
+ return position_in_partition::tri_compare(*_schema)(pos, _mf_filter->uppermost_bound()) <= 0;
+ }
+
/*
* Sets the range tombstone start. Overwrites the currently set RT start if any.
* Used for skipping through wide partitions using index when the data block
--
2.28.0

Botond Dénes

<bdenes@scylladb.com>
unread,
Nov 24, 2020, 11:02:43 AM11/24/20
to scylladb-dev@googlegroups.com, Botond Dénes
This method can be used to advance the promoted index to a position such
that it is positioned on the closest entry strictly after both the given
position-in-partition and disk data position. This will be used to skip
over corrupt parts of an sstable if such is found during parsing. When
doing this, this method will be supplied the position of the last sane
mutation fragment and the current position in the data.

Signed-off-by: Botond Dénes <bde...@scylladb.com>
---
sstables/index_reader.hh | 70 ++++++++++++++++++++++++++++++++++++++++
1 file changed, 70 insertions(+)

diff --git a/sstables/index_reader.hh b/sstables/index_reader.hh
index fd1d1e61f..b4071201f 100644
--- a/sstables/index_reader.hh
+++ b/sstables/index_reader.hh
@@ -771,6 +771,76 @@ class index_reader {
});
}

+ /// Advances the index to the first promoted-index entry, whose start
+ /// position is larger than `last_pos` and whose disk offset is larger than
+ /// `last_data_file_pos`.
+ /// The index has to be positioned in the current partition.
+ /// Returns the start position of the found promoted index entry if
+ /// successful, or position_in_partition::after_all_clustered_rows()
+ /// otherwise.
+ /// The index's lower bound is only set if the advance was successful,
+ /// it is left unchanged otherwise.
+ future<position_in_partition> advance_to_after(position_in_partition_view last_pos, uint64_t last_data_file_pos) {
+ index_entry& e = current_partition_entry();
+
+ if (!e.get_promoted_index()) {
+ sstlog.trace("index {}: advance_to_after({}, {}): no promoted index", fmt::ptr(this), last_pos, last_data_file_pos);
+ // No promoted index, no other indexed position in the partition.
+ return make_ready_future<position_in_partition>(position_in_partition(position_in_partition::after_all_clustered_rows()));
+ }
+
+ promoted_index& pi = *e.get_promoted_index();
+ return pi.cursor().advance_to(last_pos).then([this, &e, &pi, last_pos, last_data_file_pos] (std::optional<clustered_index_cursor::skip_info> si) {
+ sstlog.trace("index {}: advance_to_after({}, {}): advancing cursor to last_pos, in the same block: {}",
+ fmt::ptr(this), last_pos, last_data_file_pos, bool(si));
+ return do_with(std::optional<clustered_index_cursor::entry_info>{},
+ [this, &e, &pi, last_pos, last_data_file_pos] (std::optional<clustered_index_cursor::entry_info>& ei) {
+ return repeat([&e, &pi, &ei, last_data_file_pos] {
+ return pi.cursor().next_entry().then([&e, &ei, last_data_file_pos] (std::optional<clustered_index_cursor::entry_info> current_ei) {
+ if (!current_ei) {
+ return stop_iteration::yes;
+ }
+ if (current_ei->sinfo.offset > (last_data_file_pos - e.position())) {
+ ei = std::move(current_ei);
+ return stop_iteration::yes;
+ }
+ return stop_iteration::no;
+ });
+ }).then([this, &e, &ei, last_pos, last_data_file_pos] {
+ if (!ei) {
+ sstlog.trace("index {}: advance_to_after({}, {}): no more promoted index entries}", fmt::ptr(this), last_pos, last_data_file_pos);
+ // No more indexed clustering rows in the partition.
+ return position_in_partition(position_in_partition::after_all_clustered_rows());
+ }
+
+ auto& si = ei->sinfo;
+ if (!si.active_tombstone) {
+ // End open marker can be only engaged in SSTables 3.x ('mc' format) and never in ka/la
+ _lower_bound.end_open_marker.reset();
+ } else {
+ _lower_bound.end_open_marker = open_rt_marker{std::move(si.active_tombstone_pos), si.active_tombstone};
+ }
+ _lower_bound.data_file_position = e.position() + si.offset;
+ _lower_bound.element = indexable_element::cell;
+
+ auto pos = std::visit([this] (auto start) {
+ using bound_type = decltype(start);
+ if constexpr (std::is_same_v<bound_type, position_in_partition_view>) {
+ return position_in_partition(start);
+ } else {
+ return position_in_partition::for_key(clustering_key::from_exploded(start.values()));
+ }
+ }, ei->start);
+
+ sstlog.trace("index {}: advance_to_after({}, {}): skipped to ck={}, _data_file_position={}",
+ fmt::ptr(this), last_pos, last_data_file_pos, pos, _lower_bound.data_file_position);
+
+ return pos;
+ });
+ });
+ });
+ }
+
// Like advance_to(dht::ring_position_view), but returns information whether the key was found
// If upper_bound is provided, the upper bound within position is looked up
future<bool> advance_lower_and_check_if_present(
--
2.28.0

Botond Dénes

<bdenes@scylladb.com>
unread,
Nov 24, 2020, 11:07:28 AM11/24/20
to scylladb-dev@googlegroups.com, Botond Dénes
Currently scrub compaction is only able to drop out-of-order and
duplicate rows from the data. This series extends it to be also usable
to salvage parts of a corrupt sstable that fails to parse. This new
feature uses the index to skip over corrupt parts of the sstable. Of
course the effectiveness of this method is limited as it relies on
several assumptions:
* It assumes the index is not corrupt - at least for the parts where the
sstable is corrupt.
* It assumes that the corruption is localized to certain rows or parts
of them.

This seems to be enough to successfully scrub corrupted sstables seen as
part of a recently opened issue #7623, where the clustering key as well
as the contents of certain rows are corrupt, but the rest of the sstable
parses fine.

Fixes: #7658

Also on: https://github.com/denesb/scylla.git
sstable-reader-scrub-mode/v1-rfc

TODO:
* Wire into scrub compaction - this RFC only implements the reader part.
* More testing.

Botond Dénes (6):
sstables: sstable_reader: add partition context to exceptions
sstables: clustered_index_cursor: add skip_info to entry_info
sstables: index_reader: add advance_to_after()
sstables: sstable_reader: track last position seen by the reader
sstables: consumer: add is_in_range() helper
sstables: sstable_reader: add scrub mode

sstables/index_entry.hh | 2 +-
sstables/index_reader.hh | 70 ++++++++++++
sstables/mp_row_consumer.hh | 50 +++++++--
sstables/mx/bsearch_clustered_cursor.hh | 17 ++-
sstables/scanning_clustered_index_cursor.hh | 5 +-
sstables/partition.cc | 113 +++++++++++++++++---
6 files changed, 230 insertions(+), 27 deletions(-)

--
2.28.0

Botond Dénes

<bdenes@scylladb.com>
unread,
Nov 24, 2020, 11:07:29 AM11/24/20
to scylladb-dev@googlegroups.com, Botond Dénes
If the current partition key is known, add it to any error bubbling up
from parsing so the error can be narrowed further down to the affected
partition.

Signed-off-by: Botond Dénes <bde...@scylladb.com>
---
sstables/partition.cc | 13 +++++++++++--
1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/sstables/partition.cc b/sstables/partition.cc
index 367859d1a..ade9fdf1c 100644
--- a/sstables/partition.cc
+++ b/sstables/partition.cc
@@ -428,13 +428,22 @@ class sstable_mutation_reader : public mp_row_consumer_reader {
return read_next_partition();
}
} else {
- return do_until([this] { return is_buffer_full() || _partition_finished || _end_of_stream; }, [this] {
+ return do_until([this] { return is_buffer_full() || _partition_finished || _end_of_stream; }, [this, timeout] {
_consumer.push_ready_fragments();
if (is_buffer_full() || _partition_finished || _end_of_stream) {
return make_ready_future<>();
}
return advance_context(_consumer.maybe_skip()).then([this] {
- return _context->read();
+ return _context->read().handle_exception([this] (std::exception_ptr ex) {
+ if (!_current_partition_key) {
+ return make_exception_future<>(std::move(ex));
+ }
+ try {
+ std::rethrow_exception(std::move(ex));
+ } catch (...) {
+ std::throw_with_nested(std::runtime_error(fmt::format("Error reading from partition {}", *_current_partition_key)));
+ }
+ });
});
});
}
--
2.28.0

Botond Dénes

<bdenes@scylladb.com>
unread,
Nov 24, 2020, 11:07:29 AM11/24/20
to scylladb-dev@googlegroups.com, Botond Dénes
So that `clustered_index_cursor::next_entry()` also returns enough
information to be able to skip to the position it returns in the data
file.

Signed-off-by: Botond Dénes <bde...@scylladb.com>
---

Botond Dénes

<bdenes@scylladb.com>
unread,
Nov 24, 2020, 11:07:30 AM11/24/20
to scylladb-dev@googlegroups.com, Botond Dénes
This method can be used to advance the promoted index to a position such
that it is positioned on the closest entry strictly after both the given
position-in-partition and disk data position. This will be used to skip
over corrupt parts of an sstable if such is found during parsing. When
doing this, this method will be supplied the position of the last sane
mutation fragment and the current position in the data.

Signed-off-by: Botond Dénes <bde...@scylladb.com>
---

Botond Dénes

<bdenes@scylladb.com>
unread,
Nov 24, 2020, 11:07:31 AM11/24/20
to scylladb-dev@googlegroups.com, Botond Dénes
So that in the soon-to-be-added scrub mode, in the case of a parse
error, we have the last position to pass to
`index_reader::advance_to_after()` to calculate the closest position in
the sstable we can skip to.
The position is tracked by views during the buffer-fill process and a
copy of the last position is only copied (materialized) at the end, when
the consumer can start popping mutation fragments. This way we can get
away with one copy per buffer-fill.

Signed-off-by: Botond Dénes <bde...@scylladb.com>
---
sstables/mp_row_consumer.hh | 42 +++++++++++++++++++++++++++++--------
sstables/partition.cc | 4 ++--
2 files changed, 35 insertions(+), 11 deletions(-)

diff --git a/sstables/mp_row_consumer.hh b/sstables/mp_row_consumer.hh
index 3230207f3..ed3f7d944 100644
--- a/sstables/mp_row_consumer.hh
+++ b/sstables/mp_row_consumer.hh
diff --git a/sstables/partition.cc b/sstables/partition.cc
index ade9fdf1c..727194d71 100644
--- a/sstables/partition.cc
+++ b/sstables/partition.cc
@@ -375,7 +375,7 @@ class sstable_mutation_reader : public mp_row_consumer_reader {
if (_fwd == streamed_mutation::forwarding::yes) {

Botond Dénes

<bdenes@scylladb.com>
unread,
Nov 24, 2020, 11:07:32 AM11/24/20
to scylladb-dev@googlegroups.com, Botond Dénes
So that the user is able to determine whether a given position is inside
the current fast-forward-to window.

Signed-off-by: Botond Dénes <bde...@scylladb.com>
---
sstables/mp_row_consumer.hh | 8 ++++++++
1 file changed, 8 insertions(+)

diff --git a/sstables/mp_row_consumer.hh b/sstables/mp_row_consumer.hh
index ed3f7d944..51a16afb8 100644
--- a/sstables/mp_row_consumer.hh
+++ b/sstables/mp_row_consumer.hh

Botond Dénes

<bdenes@scylladb.com>
unread,
Nov 24, 2020, 11:07:33 AM11/24/20
to scylladb-dev@googlegroups.com, Botond Dénes
In scrub mode the sstable reader will try hard to not fail on parse
errors. When a parse error happens, it will try to skip to the closest
indexed position in the sstable file. For this it first calls
`index_reader::advance_to_after()` to see if there is such a position in
the current partition. If this fails it skips to the next partition.
The reader obtains a new constructor argument flag which can be used to
turn this on (it defaults to false). This mode will be used by scrub
compaction to enable using it to salvage data from corrupted sstables.

Signed-off-by: Botond Dénes <bde...@scylladb.com>
---
sstables/partition.cc | 98 +++++++++++++++++++++++++++++++++++++++----
1 file changed, 89 insertions(+), 9 deletions(-)

diff --git a/sstables/partition.cc b/sstables/partition.cc
index 727194d71..6e6e4ebd0 100644
--- a/sstables/partition.cc
+++ b/sstables/partition.cc
+ });
+ });
+ }
public:
void on_out_of_clustering_range() override {
if (_fwd == streamed_mutation::forwarding::yes) {
@@ -420,7 +494,7 @@ class sstable_mutation_reader : public mp_row_consumer_reader {
}
});
}
- return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {
+ return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this, timeout] {
if (_partition_finished) {
if (_before_partition) {
return read_partition();
@@ -433,8 +507,14 @@ class sstable_mutation_reader : public mp_row_consumer_reader {
if (is_buffer_full() || _partition_finished || _end_of_stream) {
return make_ready_future<>();
}

Botond Dénes

<bdenes@scylladb.com>
unread,
Nov 24, 2020, 11:07:34 AM11/24/20
to scylladb-dev@googlegroups.com
A patch is missing, resending.
Reply all
Reply to author
Forward
0 new messages