[QUEUED scylladb next] mutation_fragment_stream_validator: move active tomsbtone validation into low level validator

0 views
Skip to first unread message

Commit Bot

<bot@cloudius-systems.com>
unread,
Oct 3, 2022, 10:24:08 AM10/3/22
to scylladb-dev@googlegroups.com, Botond Dénes
From: Botond Dénes <bde...@scylladb.com>
Committer: Botond Dénes <bde...@scylladb.com>
Branch: next

mutation_fragment_stream_validator: move active tomsbtone validation into low level validator

Currently the active range tombstone change is validated in the high
level `mutation_fragment_stream_validating_stream`, meaning that users of
the low-level `mutation_fragment_stream_validator` don't benefit from
checking that tombstones are properly closed.
This patch moves the validation down to the low-level validator (which
is what the high-level one uses under the hood too), and requires all
users to pass information about changes to the active tombstone for each
fragment.

---
diff --git a/mutation_fragment_stream_validator.hh b/mutation_fragment_stream_validator.hh
--- a/mutation_fragment_stream_validator.hh
+++ b/mutation_fragment_stream_validator.hh
@@ -41,9 +41,11 @@ public:
/// `operator()(const mutation_fragment&)` is not desired.
/// Using both overloads for the same stream is not supported.
/// Advances the previous fragment kind, but only if the validation passes.
+ /// `new_current_tombstone` should be engaged only when the fragment changes
+ /// the current tombstone (range tombstone change fragments).
///
/// \returns true if the fragment kind is valid.
- bool operator()(mutation_fragment_v2::kind kind);
+ bool operator()(mutation_fragment_v2::kind kind, std::optional<tombstone> new_current_tombstone);
bool operator()(mutation_fragment::kind kind);

/// Validates the monotonicity of the mutation fragment kind and position.
@@ -54,9 +56,11 @@ public:
/// Using both overloads for the same stream is not supported.
/// Advances the previous fragment kind and position-in-partition, but only
/// if the validation passes.
+ /// `new_current_tombstone` should be engaged only when the fragment changes
+ /// the current tombstone (range tombstone change fragments).
///
/// \returns true if the mutation fragment kind is valid.
- bool operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos);
+ bool operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos, std::optional<tombstone> new_current_tombstone);
bool operator()(mutation_fragment::kind kind, position_in_partition_view pos);

/// Validates the monotonicity of the mutation fragment.
@@ -158,7 +162,6 @@ class mutation_fragment_stream_validating_filter {
mutation_fragment_stream_validator _validator;
sstring _name;
mutation_fragment_stream_validation_level _validation_level;
- tombstone _current_tombstone;

public:
/// Constructor.
@@ -169,7 +172,7 @@ public:
mutation_fragment_stream_validating_filter(sstring_view name, const schema& s, mutation_fragment_stream_validation_level level);

bool operator()(const dht::decorated_key& dk);
- bool operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos);
+ bool operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos, std::optional<tombstone> new_current_tombstone);
bool operator()(mutation_fragment::kind kind, position_in_partition_view pos);
/// Equivalent to `operator()(mf.kind(), mf.position())`
bool operator()(const mutation_fragment_v2& mv);
diff --git a/readers/mutation_reader.cc b/readers/mutation_reader.cc
--- a/readers/mutation_reader.cc
+++ b/readers/mutation_reader.cc
@@ -55,7 +55,8 @@ bool mutation_fragment_stream_validator::operator()(dht::token t) {
return false;
}

-bool mutation_fragment_stream_validator::operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos) {
+bool mutation_fragment_stream_validator::operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos,
+ std::optional<tombstone> new_current_tombstone) {
if (kind == mutation_fragment_v2::kind::partition_end && _current_tombstone) {
return false;
}
@@ -64,6 +65,7 @@ bool mutation_fragment_stream_validator::operator()(mutation_fragment_v2::kind k
if (valid) {
_prev_kind = mutation_fragment_v2::kind::partition_start;
_prev_pos = pos;
+ _current_tombstone = new_current_tombstone.value_or(_current_tombstone);
}
return valid;
}
@@ -78,25 +80,23 @@ bool mutation_fragment_stream_validator::operator()(mutation_fragment_v2::kind k
if (valid) {
_prev_kind = kind;
_prev_pos = pos;
+ _current_tombstone = new_current_tombstone.value_or(_current_tombstone);
}
return valid;
}
bool mutation_fragment_stream_validator::operator()(mutation_fragment::kind kind, position_in_partition_view pos) {
- return (*this)(to_mutation_fragment_kind_v2(kind), pos);
+ return (*this)(to_mutation_fragment_kind_v2(kind), pos, {});
}

bool mutation_fragment_stream_validator::operator()(const mutation_fragment_v2& mf) {
- const auto valid = (*this)(mf.mutation_fragment_kind(), mf.position());
- if (valid && mf.is_range_tombstone_change()) {
- _current_tombstone = mf.as_range_tombstone_change().tombstone();
- }
- return valid;
+ return (*this)(mf.mutation_fragment_kind(), mf.position(),
+ mf.is_range_tombstone_change() ? std::optional(mf.as_range_tombstone_change().tombstone()) : std::nullopt);
}
bool mutation_fragment_stream_validator::operator()(const mutation_fragment& mf) {
- return (*this)(to_mutation_fragment_kind_v2(mf.mutation_fragment_kind()), mf.position());
+ return (*this)(to_mutation_fragment_kind_v2(mf.mutation_fragment_kind()), mf.position(), {});
}

-bool mutation_fragment_stream_validator::operator()(mutation_fragment_v2::kind kind) {
+bool mutation_fragment_stream_validator::operator()(mutation_fragment_v2::kind kind, std::optional<tombstone> new_current_tombstone) {
bool valid = true;
switch (_prev_kind) {
case mutation_fragment_v2::kind::partition_start:
@@ -112,13 +112,17 @@ bool mutation_fragment_stream_validator::operator()(mutation_fragment_v2::kind k
valid = kind == mutation_fragment_v2::kind::partition_start;
break;
}
+ if (kind == mutation_fragment_v2::kind::partition_end) {
+ valid &= !_current_tombstone;
+ }
if (valid) {
_prev_kind = kind;
+ _current_tombstone = new_current_tombstone.value_or(_current_tombstone);
}
return valid;
}
bool mutation_fragment_stream_validator::operator()(mutation_fragment::kind kind) {
- return (*this)(to_mutation_fragment_kind_v2(kind));
+ return (*this)(to_mutation_fragment_kind_v2(kind), {});
}

bool mutation_fragment_stream_validator::on_end_of_stream() {
@@ -203,20 +207,16 @@ mutation_fragment_stream_validating_filter::mutation_fragment_stream_validating_
}
}

-bool mutation_fragment_stream_validating_filter::operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos) {
+bool mutation_fragment_stream_validating_filter::operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos,
+ std::optional<tombstone> new_current_tombstone) {
bool valid = false;

- mrlog.debug("[validator {}] {}:{}", static_cast<void*>(this), kind, pos);
-
- if (kind == mutation_fragment_v2::kind::partition_end && _current_tombstone) {
- on_validation_error(mrlog, format("[validator {} for {}] Unexpected active tombstone at partition-end: partition key {}: tombstone {}",
- static_cast<void*>(this), _name, _validator.previous_partition_key(), _current_tombstone));
- }
+ mrlog.debug("[validator {}] {}:{} new_current_tombstone: {}", static_cast<void*>(this), kind, pos, new_current_tombstone);

if (_validation_level >= mutation_fragment_stream_validation_level::clustering_key) {
- valid = _validator(kind, pos);
+ valid = _validator(kind, pos, new_current_tombstone);
} else {
- valid = _validator(kind);
+ valid = _validator(kind, new_current_tombstone);
}

if (__builtin_expect(!valid, false)) {
@@ -226,6 +226,9 @@ bool mutation_fragment_stream_validating_filter::operator()(mutation_fragment_v2
} else if (_validation_level >= mutation_fragment_stream_validation_level::partition_key) {
on_validation_error(mrlog, format("[validator {} for {}] Unexpected mutation fragment: partition key {}: previous {}, current {}",
static_cast<void*>(this), _name, _validator.previous_partition_key(), _validator.previous_mutation_fragment_kind(), kind));
+ } else if (kind == mutation_fragment_v2::kind::partition_end && _validator.current_tombstone()) {
+ on_validation_error(mrlog, format("[validator {} for {}] Partition ended with active tombstone: {}",
+ static_cast<void*>(this), _name, _validator.current_tombstone()));
} else {
on_validation_error(mrlog, format("[validator {} for {}] Unexpected mutation fragment: previous {}, current {}",
static_cast<void*>(this), _name, _validator.previous_mutation_fragment_kind(), kind));
@@ -236,18 +239,15 @@ bool mutation_fragment_stream_validating_filter::operator()(mutation_fragment_v2
}

bool mutation_fragment_stream_validating_filter::operator()(mutation_fragment::kind kind, position_in_partition_view pos) {
- return (*this)(to_mutation_fragment_kind_v2(kind), pos);
+ return (*this)(to_mutation_fragment_kind_v2(kind), pos, {});
}

bool mutation_fragment_stream_validating_filter::operator()(const mutation_fragment_v2& mv) {
- auto valid = (*this)(mv.mutation_fragment_kind(), mv.position());
- if (valid && mv.is_range_tombstone_change()) {
- _current_tombstone = mv.as_range_tombstone_change().tombstone();
- }
- return valid;
+ return (*this)(mv.mutation_fragment_kind(), mv.position(),
+ mv.is_range_tombstone_change() ? std::optional(mv.as_range_tombstone_change().tombstone()) : std::nullopt);
}
bool mutation_fragment_stream_validating_filter::operator()(const mutation_fragment& mv) {
- return (*this)(to_mutation_fragment_kind_v2(mv.mutation_fragment_kind()), mv.position());
+ return (*this)(to_mutation_fragment_kind_v2(mv.mutation_fragment_kind()), mv.position(), {});
}

bool mutation_fragment_stream_validating_filter::on_end_of_partition() {
diff --git a/sstables/writer.cc b/sstables/writer.cc
--- a/sstables/writer.cc
+++ b/sstables/writer.cc
@@ -30,7 +30,7 @@ sstable_writer::sstable_writer(sstable& sst, const schema& s, uint64_t estimated

void sstable_writer::consume_new_partition(const dht::decorated_key& dk) {
_impl->_validator(dk);
- _impl->_validator(mutation_fragment::kind::partition_start, position_in_partition_view(position_in_partition_view::partition_start_tag_t{}));
+ _impl->_validator(mutation_fragment_v2::kind::partition_start, position_in_partition_view(position_in_partition_view::partition_start_tag_t{}), {});
_impl->_sst.get_stats().on_partition_write();
return _impl->consume_new_partition(dk);
}
@@ -41,21 +41,21 @@ void sstable_writer::consume(tombstone t) {
}

stop_iteration sstable_writer::consume(static_row&& sr) {
- _impl->_validator(mutation_fragment::kind::static_row, sr.position());
+ _impl->_validator(mutation_fragment_v2::kind::static_row, sr.position(), {});
if (!sr.empty()) {
_impl->_sst.get_stats().on_static_row_write();
}
return _impl->consume(std::move(sr));
}

stop_iteration sstable_writer::consume(clustering_row&& cr) {
- _impl->_validator(mutation_fragment::kind::clustering_row, cr.position());
+ _impl->_validator(mutation_fragment_v2::kind::clustering_row, cr.position(), {});
_impl->_sst.get_stats().on_row_write();
return _impl->consume(std::move(cr));
}

stop_iteration sstable_writer::consume(range_tombstone_change&& rtc) {
- _impl->_validator(mutation_fragment_v2::kind::range_tombstone_change, rtc.position());
+ _impl->_validator(mutation_fragment_v2::kind::range_tombstone_change, rtc.position(), rtc.tombstone());
_impl->_sst.get_stats().on_range_tombstone_write();
return _impl->consume(std::move(rtc));
}
diff --git a/test/lib/mutation_assertions.hh b/test/lib/mutation_assertions.hh
--- a/test/lib/mutation_assertions.hh
+++ b/test/lib/mutation_assertions.hh
@@ -205,27 +205,27 @@ public:

void consume_new_partition(const dht::decorated_key& dk) {
testlog.debug("consume new partition: {}", dk);
- BOOST_REQUIRE(_validator(mutation_fragment_v2::kind::partition_start, position_in_partition_view(position_in_partition_view::partition_start_tag_t{})));
+ BOOST_REQUIRE(_validator(mutation_fragment_v2::kind::partition_start, position_in_partition_view(position_in_partition_view::partition_start_tag_t{}), {}));
}
void consume(tombstone) { }
stop_iteration consume(static_row&& sr) {
testlog.debug("consume static_row");
- BOOST_REQUIRE(_validator(mutation_fragment_v2::kind::static_row, sr.position()));
+ BOOST_REQUIRE(_validator(mutation_fragment_v2::kind::static_row, sr.position(), {}));
return stop_iteration::no;
}
stop_iteration consume(clustering_row&& cr) {
testlog.debug("consume clustering_row: {}", cr.key());
- BOOST_REQUIRE(_validator(mutation_fragment_v2::kind::clustering_row, cr.position()));
+ BOOST_REQUIRE(_validator(mutation_fragment_v2::kind::clustering_row, cr.position(), {}));
return stop_iteration::no;
}
stop_iteration consume(range_tombstone_change&& rtc) {
testlog.debug("consume range_tombstone_change: {} {}", rtc.position(), rtc.tombstone());
- BOOST_REQUIRE(_validator(mutation_fragment_v2::kind::range_tombstone_change, rtc.position()));
+ BOOST_REQUIRE(_validator(mutation_fragment_v2::kind::range_tombstone_change, rtc.position(), rtc.tombstone()));
return stop_iteration::no;
}
stop_iteration consume_end_of_partition() {
testlog.debug("consume end of partition");
- BOOST_REQUIRE(_validator(mutation_fragment_v2::kind::partition_end, position_in_partition_view(position_in_partition_view::end_of_partition_tag_t{})));
+ BOOST_REQUIRE(_validator(mutation_fragment_v2::kind::partition_end, position_in_partition_view(position_in_partition_view::end_of_partition_tag_t{}), {}));
return stop_iteration::no;
}
void consume_end_of_stream() {

Commit Bot

<bot@cloudius-systems.com>
unread,
Oct 3, 2022, 7:26:47 PM10/3/22
to scylladb-dev@googlegroups.com, Botond Dénes
From: Botond Dénes <bde...@scylladb.com>
Committer: Botond Dénes <bde...@scylladb.com>
Branch: master
Reply all
Reply to author
Forward
0 new messages