[QUEUED scylladb next] Merge 'mutation_fragment_stream_validator: various API improvements' from Botond Dénes

0 views
Skip to first unread message

Commit Bot

<bot@cloudius-systems.com>
unread,
Oct 3, 2022, 10:24:12 AM10/3/22
to scylladb-dev@googlegroups.com, Tomasz Grabiec
From: Tomasz Grabiec <tgra...@scylladb.com>
Committer: Tomasz Grabiec <tgra...@scylladb.com>
Branch: next

Merge 'mutation_fragment_stream_validator: various API improvements' from Botond Dénes

The low-level `mutation_fragment_stream_validator` gets `reset()` methods that until now only the high-level `mutation_fragment_stream_validating_filter` had.
Active tombstone validation is pushed down to the low level validator.
The low level validator, which was a pain to use until now due to being very fussy on which subset of its API one used, is made much more robust, not requiring the user to stick to a subset of its API anymore.

Closes #11614

* github.com:scylladb/scylladb:
mutation_fragment_stream_validator: make interface more robust
mutation_fragment_stream_validator: add reset() to validating filter
mutation_fragment_stream_validator: move active tomsbtone validation into low level validator

---
diff --git a/mutation_fragment_stream_validator.hh b/mutation_fragment_stream_validator.hh
--- a/mutation_fragment_stream_validator.hh
+++ b/mutation_fragment_stream_validator.hh
@@ -20,15 +20,18 @@ enum class mutation_fragment_stream_validation_level {
/// Low level fragment stream validator.
///
/// Tracks and validates the monotonicity of the passed in fragment kinds,
-/// position in partition, token or partition keys. Any subset of these
-/// can be used, but what is used have to be consistent across the entire
-/// stream.
+/// position in partition, token or partition keys.
class mutation_fragment_stream_validator {
const ::schema& _schema;
mutation_fragment_v2::kind _prev_kind;
position_in_partition _prev_pos;
dht::decorated_key _prev_partition_key;
tombstone _current_tombstone;
+
+private:
+ bool validate(dht::token t, const partition_key* pkey);
+ bool validate(mutation_fragment_v2::kind kind, std::optional<position_in_partition_view> pos,
+ std::optional<tombstone> new_current_tombstone);
public:
explicit mutation_fragment_stream_validator(const schema& s);

@@ -41,9 +44,11 @@ public:
/// `operator()(const mutation_fragment&)` is not desired.
/// Using both overloads for the same stream is not supported.
/// Advances the previous fragment kind, but only if the validation passes.
+ /// `new_current_tombstone` should be engaged only when the fragment changes
+ /// the current tombstone (range tombstone change fragments).
///
/// \returns true if the fragment kind is valid.
- bool operator()(mutation_fragment_v2::kind kind);
+ bool operator()(mutation_fragment_v2::kind kind, std::optional<tombstone> new_current_tombstone);
bool operator()(mutation_fragment::kind kind);

/// Validates the monotonicity of the mutation fragment kind and position.
@@ -54,9 +59,11 @@ public:
/// Using both overloads for the same stream is not supported.
/// Advances the previous fragment kind and position-in-partition, but only
/// if the validation passes.
+ /// `new_current_tombstone` should be engaged only when the fragment changes
+ /// the current tombstone (range tombstone change fragments).
///
/// \returns true if the mutation fragment kind is valid.
- bool operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos);
+ bool operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos, std::optional<tombstone> new_current_tombstone);
bool operator()(mutation_fragment::kind kind, position_in_partition_view pos);

/// Validates the monotonicity of the mutation fragment.
@@ -104,6 +111,7 @@ public:
/// normally invalid and hence wouldn't advance the internal state. This
/// can be used by users that can correct such invalid streams and wish to
/// continue validating it.
+ void reset(mutation_fragment_v2::kind kind, position_in_partition_view pos, std::optional<tombstone> new_current_tombstone);
void reset(const mutation_fragment&);
void reset(const mutation_fragment_v2&);

@@ -119,26 +127,27 @@ public:
}
/// The previous valid position.
///
- /// Not meaningful, when operator()(position_in_partition_view) is not used.
+ /// Call only if operator()(position_in_partition_view) was used.
const position_in_partition& previous_position() const {
return _prev_pos;
}
/// Get the current effective tombstone
///
- /// Not meaningful, when operator()(mutation_fragment_v2) is not used.
+ /// Call only if operator()(mutation_fragment_v2) or
+ /// operator()(mutation_fragment_v2::kind, position_in_partition_view, std::optional<tombstone>)
+ /// was not used.
tombstone current_tombstone() const {
return _current_tombstone;
}
/// The previous valid partition key.
///
- /// Only valid if `operator()(const dht::decorated_key&)` or
- /// `operator()(dht::token)` was used.
+ /// Call only if operator()(dht::token) or operator()(const dht::decorated_key&) was used.
dht::token previous_token() const {
return _prev_partition_key.token();
}
/// The previous valid partition key.
///
- /// Only valid if `operator()(const dht::decorated_key&)` was used.
+ /// Call only if operator()(const dht::decorated_key&) was used.
const dht::decorated_key& previous_partition_key() const {
return _prev_partition_key;
}
@@ -158,7 +167,6 @@ class mutation_fragment_stream_validating_filter {
mutation_fragment_stream_validator _validator;
sstring _name;
mutation_fragment_stream_validation_level _validation_level;
- tombstone _current_tombstone;

public:
/// Constructor.
@@ -169,11 +177,13 @@ public:
mutation_fragment_stream_validating_filter(sstring_view name, const schema& s, mutation_fragment_stream_validation_level level);

bool operator()(const dht::decorated_key& dk);
- bool operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos);
+ bool operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos, std::optional<tombstone> new_current_tombstone);
bool operator()(mutation_fragment::kind kind, position_in_partition_view pos);
/// Equivalent to `operator()(mf.kind(), mf.position())`
bool operator()(const mutation_fragment_v2& mv);
bool operator()(const mutation_fragment& mv);
+ void reset(mutation_fragment_v2::kind kind, position_in_partition_view pos, std::optional<tombstone> new_current_tombstone);
+ void reset(const mutation_fragment_v2& mf);
/// Equivalent to `operator()(partition_end{})`
bool on_end_of_partition();
void on_end_of_stream();
diff --git a/readers/mutation_reader.cc b/readers/mutation_reader.cc
--- a/readers/mutation_reader.cc
+++ b/readers/mutation_reader.cc
@@ -6,6 +6,8 @@
* SPDX-License-Identifier: AGPL-3.0-or-later
*/

+#include <seastar/util/lazy.hh>
+
#include "readers/flat_mutation_reader_v2.hh"
#include "mutation_rebuilder.hh"
#include "mutation_fragment_stream_validator.hh"
@@ -39,65 +41,46 @@ mutation_fragment_stream_validator::mutation_fragment_stream_validator(const ::s
, _prev_partition_key(dht::minimum_token(), partition_key::make_empty()) {
}

-bool mutation_fragment_stream_validator::operator()(const dht::decorated_key& dk) {
- if (_prev_partition_key.less_compare(_schema, dk)) {
- _prev_partition_key = dk;
- return true;
+bool mutation_fragment_stream_validator::validate(dht::token t, const partition_key* pkey) {
+ if (_prev_partition_key.token() > t) {
+ return false;
+ }
+ partition_key::tri_compare cmp(_schema);
+ if (_prev_partition_key.token() == t && pkey && cmp(_prev_partition_key.key(), *pkey) >= 0) {
+ return false;
+ }
+ _prev_partition_key._token = t;
+ if (pkey) {
+ _prev_partition_key._key = *pkey;
+ } else {
+ // If new partition-key is not supplied, we reset it to empty one, which
+ // will compare less than any other key, making sure we don't attempt to
+ // compare partition-keys belonging to different tokens.
+ if (!_prev_partition_key.key().is_empty()) {
+ _prev_partition_key._key = partition_key::make_empty();
+ }
}
- return false;
+ return true;
+}
+
+bool mutation_fragment_stream_validator::operator()(const dht::decorated_key& dk) {
+ return validate(dk.token(), &dk.key());
}

bool mutation_fragment_stream_validator::operator()(dht::token t) {
- if (_prev_partition_key.token() <= t) {
- _prev_partition_key._token = t;
- return true;
- }
- return false;
+ return validate(t, nullptr);
}

-bool mutation_fragment_stream_validator::operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos) {
+bool mutation_fragment_stream_validator::validate(mutation_fragment_v2::kind kind, std::optional<position_in_partition_view> pos,
+ std::optional<tombstone> new_current_tombstone) {
+ // Check for unclosed range tombstone on partition end
if (kind == mutation_fragment_v2::kind::partition_end && _current_tombstone) {
return false;
}
- if (_prev_kind == mutation_fragment_v2::kind::partition_end) {
- const bool valid = (kind == mutation_fragment_v2::kind::partition_start);
- if (valid) {
- _prev_kind = mutation_fragment_v2::kind::partition_start;
- _prev_pos = pos;
- }
- return valid;
- }
- auto cmp = position_in_partition::tri_compare(_schema);
- auto res = cmp(_prev_pos, pos);
- bool valid = true;
- if (_prev_kind == mutation_fragment_v2::kind::range_tombstone_change) {
- valid = res <= 0;
- } else {
- valid = res < 0;
- }
- if (valid) {
- _prev_kind = kind;
- _prev_pos = pos;
- }
- return valid;
-}
-bool mutation_fragment_stream_validator::operator()(mutation_fragment::kind kind, position_in_partition_view pos) {
- return (*this)(to_mutation_fragment_kind_v2(kind), pos);
-}

-bool mutation_fragment_stream_validator::operator()(const mutation_fragment_v2& mf) {
- const auto valid = (*this)(mf.mutation_fragment_kind(), mf.position());
- if (valid && mf.is_range_tombstone_change()) {
- _current_tombstone = mf.as_range_tombstone_change().tombstone();
- }
- return valid;
-}
-bool mutation_fragment_stream_validator::operator()(const mutation_fragment& mf) {
- return (*this)(to_mutation_fragment_kind_v2(mf.mutation_fragment_kind()), mf.position());
-}
+ auto valid = true;

-bool mutation_fragment_stream_validator::operator()(mutation_fragment_v2::kind kind) {
- bool valid = true;
+ // Check fragment kind order
switch (_prev_kind) {
case mutation_fragment_v2::kind::partition_start:
valid = kind != mutation_fragment_v2::kind::partition_start;
@@ -112,38 +95,98 @@ bool mutation_fragment_stream_validator::operator()(mutation_fragment_v2::kind k
valid = kind == mutation_fragment_v2::kind::partition_start;
break;
}
- if (valid) {
- _prev_kind = kind;
+ if (!valid) {
+ return false;
+ }
+
+ if (pos && _prev_kind != mutation_fragment_v2::kind::partition_end) {
+ auto cmp = position_in_partition::tri_compare(_schema);
+ auto res = cmp(_prev_pos, *pos);
+ if (_prev_kind == mutation_fragment_v2::kind::range_tombstone_change) {
+ valid = res <= 0;
+ } else {
+ valid = res < 0;
+ }
+ if (!valid) {
+ return false;
+ }
}
- return valid;
+
+ _prev_kind = kind;
+ if (pos) {
+ _prev_pos = *pos;
+ } else {
+ switch (kind) {
+ case mutation_fragment_v2::kind::partition_start:
+ _prev_pos = position_in_partition(position_in_partition::partition_start_tag_t{});
+ break;
+ case mutation_fragment_v2::kind::static_row:
+ _prev_pos = position_in_partition(position_in_partition::static_row_tag_t{});
+ break;
+ case mutation_fragment_v2::kind::clustering_row:
+ [[fallthrough]];
+ case mutation_fragment_v2::kind::range_tombstone_change:
+ if (_prev_pos.region() != partition_region::clustered) { // don't move pos if it is already a clustering one
+ _prev_pos = position_in_partition(position_in_partition::before_clustering_row_tag_t{}, clustering_key::make_empty());
+ }
+ break;
+ case mutation_fragment_v2::kind::partition_end:
+ _prev_pos = position_in_partition(position_in_partition::end_of_partition_tag_t{});
+ break;
+ }
+ }
+ if (new_current_tombstone) {
+ _current_tombstone = *new_current_tombstone;
+ }
+ return true;
+}
+
+bool mutation_fragment_stream_validator::operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos,
+ std::optional<tombstone> new_current_tombstone) {
+ return validate(kind, pos, new_current_tombstone);
+}
+bool mutation_fragment_stream_validator::operator()(mutation_fragment::kind kind, position_in_partition_view pos) {
+ return validate(to_mutation_fragment_kind_v2(kind), pos, {});
+}
+
+bool mutation_fragment_stream_validator::operator()(const mutation_fragment_v2& mf) {
+ return validate(mf.mutation_fragment_kind(), mf.position(),
+ mf.is_range_tombstone_change() ? std::optional(mf.as_range_tombstone_change().tombstone()) : std::nullopt);
+}
+bool mutation_fragment_stream_validator::operator()(const mutation_fragment& mf) {
+ return validate(to_mutation_fragment_kind_v2(mf.mutation_fragment_kind()), mf.position(), {});
+}
+
+bool mutation_fragment_stream_validator::operator()(mutation_fragment_v2::kind kind, std::optional<tombstone> new_current_tombstone) {
+ return validate(kind, {}, new_current_tombstone);
}
bool mutation_fragment_stream_validator::operator()(mutation_fragment::kind kind) {
- return (*this)(to_mutation_fragment_kind_v2(kind));
+ return validate(to_mutation_fragment_kind_v2(kind), {}, {});
}

bool mutation_fragment_stream_validator::on_end_of_stream() {
return _prev_kind == mutation_fragment_v2::kind::partition_end;
}

void mutation_fragment_stream_validator::reset(dht::decorated_key dk) {
- _prev_partition_key = dk;
+ _prev_partition_key = std::move(dk);
_prev_pos = position_in_partition::for_partition_start();
_prev_kind = mutation_fragment_v2::kind::partition_start;
_current_tombstone = {};
}

-void mutation_fragment_stream_validator::reset(const mutation_fragment_v2& mf) {
- _prev_pos = mf.position();
- _prev_kind = mf.mutation_fragment_kind();
- if (mf.is_range_tombstone_change()) {
- _current_tombstone = mf.as_range_tombstone_change().tombstone();
- } else {
- _current_tombstone = {};
+void mutation_fragment_stream_validator::reset(mutation_fragment_v2::kind kind, position_in_partition_view pos, std::optional<tombstone> new_current_tombstone) {
+ _prev_pos = pos;
+ _prev_kind = kind;
+ if (new_current_tombstone) {
+ _current_tombstone = *new_current_tombstone;
}
}
+void mutation_fragment_stream_validator::reset(const mutation_fragment_v2& mf) {
+ reset(mf.mutation_fragment_kind(), mf.position(), mf.is_range_tombstone_change() ? std::optional(mf.as_range_tombstone_change().tombstone()) : std::nullopt);
+}
void mutation_fragment_stream_validator::reset(const mutation_fragment& mf) {
- _prev_pos = mf.position();
- _prev_kind = to_mutation_fragment_kind_v2(mf.mutation_fragment_kind());
+ reset(to_mutation_fragment_kind_v2(mf.mutation_fragment_kind()), mf.position(), std::nullopt);
}

namespace {
@@ -203,20 +246,16 @@ mutation_fragment_stream_validating_filter::mutation_fragment_stream_validating_
}
}

-bool mutation_fragment_stream_validating_filter::operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos) {
+bool mutation_fragment_stream_validating_filter::operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos,
+ std::optional<tombstone> new_current_tombstone) {
bool valid = false;

- mrlog.debug("[validator {}] {}:{}", static_cast<void*>(this), kind, pos);
-
- if (kind == mutation_fragment_v2::kind::partition_end && _current_tombstone) {
- on_validation_error(mrlog, format("[validator {} for {}] Unexpected active tombstone at partition-end: partition key {}: tombstone {}",
- static_cast<void*>(this), _name, _validator.previous_partition_key(), _current_tombstone));
- }
+ mrlog.debug("[validator {}] {}:{} new_current_tombstone: {}", static_cast<void*>(this), kind, pos, new_current_tombstone);

if (_validation_level >= mutation_fragment_stream_validation_level::clustering_key) {
- valid = _validator(kind, pos);
+ valid = _validator(kind, pos, new_current_tombstone);
} else {
- valid = _validator(kind);
+ valid = _validator(kind, new_current_tombstone);
}

if (__builtin_expect(!valid, false)) {
@@ -226,6 +265,9 @@ bool mutation_fragment_stream_validating_filter::operator()(mutation_fragment_v2
} else if (_validation_level >= mutation_fragment_stream_validation_level::partition_key) {
on_validation_error(mrlog, format("[validator {} for {}] Unexpected mutation fragment: partition key {}: previous {}, current {}",
static_cast<void*>(this), _name, _validator.previous_partition_key(), _validator.previous_mutation_fragment_kind(), kind));
+ } else if (kind == mutation_fragment_v2::kind::partition_end && _validator.current_tombstone()) {
+ on_validation_error(mrlog, format("[validator {} for {}] Partition ended with active tombstone: {}",
+ static_cast<void*>(this), _name, _validator.current_tombstone()));
} else {
on_validation_error(mrlog, format("[validator {} for {}] Unexpected mutation fragment: previous {}, current {}",
static_cast<void*>(this), _name, _validator.previous_mutation_fragment_kind(), kind));
@@ -236,18 +278,35 @@ bool mutation_fragment_stream_validating_filter::operator()(mutation_fragment_v2
}

bool mutation_fragment_stream_validating_filter::operator()(mutation_fragment::kind kind, position_in_partition_view pos) {
- return (*this)(to_mutation_fragment_kind_v2(kind), pos);
+ return (*this)(to_mutation_fragment_kind_v2(kind), pos, {});
}

bool mutation_fragment_stream_validating_filter::operator()(const mutation_fragment_v2& mv) {
- auto valid = (*this)(mv.mutation_fragment_kind(), mv.position());
- if (valid && mv.is_range_tombstone_change()) {
- _current_tombstone = mv.as_range_tombstone_change().tombstone();
- }
- return valid;
+ return (*this)(mv.mutation_fragment_kind(), mv.position(),
+ mv.is_range_tombstone_change() ? std::optional(mv.as_range_tombstone_change().tombstone()) : std::nullopt);
}
bool mutation_fragment_stream_validating_filter::operator()(const mutation_fragment& mv) {
- return (*this)(to_mutation_fragment_kind_v2(mv.mutation_fragment_kind()), mv.position());
+ return (*this)(to_mutation_fragment_kind_v2(mv.mutation_fragment_kind()), mv.position(), {});
+}
+
+void mutation_fragment_stream_validating_filter::reset(mutation_fragment_v2::kind kind, position_in_partition_view pos,
+ std::optional<tombstone> new_current_tombstone) {
+ mrlog.debug("[validator {}] reset to {} @ {}{}", static_cast<const void*>(this), kind, pos, value_of([t = new_current_tombstone] () -> sstring {
+ if (!t) {
+ return "";
+ }
+ return format(" (new tombstone: {})", *t);
+ }));
+ _validator.reset(kind, pos, new_current_tombstone);
+}
+void mutation_fragment_stream_validating_filter::reset(const mutation_fragment_v2& mf) {
+ mrlog.debug("[validator {}] reset to {} @ {}{}", static_cast<const void*>(this), mf.mutation_fragment_kind(), mf.position(), value_of([&mf] () -> sstring {
+ if (!mf.is_range_tombstone_change()) {
+ return "";
+ }
+ return format(" (new tombstone: {})", mf.as_range_tombstone_change().tombstone());
+ }));
+ _validator.reset(mf);
}

bool mutation_fragment_stream_validating_filter::on_end_of_partition() {
diff --git a/sstables/writer.cc b/sstables/writer.cc
--- a/sstables/writer.cc
+++ b/sstables/writer.cc
@@ -30,7 +30,7 @@ sstable_writer::sstable_writer(sstable& sst, const schema& s, uint64_t estimated

void sstable_writer::consume_new_partition(const dht::decorated_key& dk) {
_impl->_validator(dk);
- _impl->_validator(mutation_fragment::kind::partition_start, position_in_partition_view(position_in_partition_view::partition_start_tag_t{}));
+ _impl->_validator(mutation_fragment_v2::kind::partition_start, position_in_partition_view(position_in_partition_view::partition_start_tag_t{}), {});
_impl->_sst.get_stats().on_partition_write();
return _impl->consume_new_partition(dk);
}
@@ -41,21 +41,21 @@ void sstable_writer::consume(tombstone t) {
}

stop_iteration sstable_writer::consume(static_row&& sr) {
- _impl->_validator(mutation_fragment::kind::static_row, sr.position());
+ _impl->_validator(mutation_fragment_v2::kind::static_row, sr.position(), {});
if (!sr.empty()) {
_impl->_sst.get_stats().on_static_row_write();
}
return _impl->consume(std::move(sr));
}

stop_iteration sstable_writer::consume(clustering_row&& cr) {
- _impl->_validator(mutation_fragment::kind::clustering_row, cr.position());
+ _impl->_validator(mutation_fragment_v2::kind::clustering_row, cr.position(), {});
_impl->_sst.get_stats().on_row_write();
return _impl->consume(std::move(cr));
}

stop_iteration sstable_writer::consume(range_tombstone_change&& rtc) {
- _impl->_validator(mutation_fragment_v2::kind::range_tombstone_change, rtc.position());
+ _impl->_validator(mutation_fragment_v2::kind::range_tombstone_change, rtc.position(), rtc.tombstone());
_impl->_sst.get_stats().on_range_tombstone_write();
return _impl->consume(std::move(rtc));
}
diff --git a/test/boost/mutation_fragment_test.cc b/test/boost/mutation_fragment_test.cc
--- a/test/boost/mutation_fragment_test.cc
+++ b/test/boost/mutation_fragment_test.cc
@@ -605,3 +605,41 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_stream_validator) {
check_invalid_after(rtc, {&ps, &sr});
check_invalid_after(pe, {&sr, &cr, &rtc, &pe});
}
+
+SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_stream_validator_mixed_api_usage) {
+ simple_schema ss;
+
+ const auto dkeys = ss.make_pkeys(3);
+ const auto& dk_ = dkeys[0];
+ const auto& dk0 = dkeys[1];
+ const auto& dk1 = dkeys[2];
+ const auto ck0 = ss.make_ckey(0);
+ const auto ck1 = ss.make_ckey(1);
+ const auto ck2 = ss.make_ckey(2);
+ const auto ck3 = ss.make_ckey(3);
+
+ reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100);
+ auto stop_sem = deferred_stop(sem);
+ auto permit = sem.make_tracking_only_permit(ss.schema().get(), get_name(), db::no_timeout);
+
+ mutation_fragment_stream_validator validator(*ss.schema());
+
+ using mf_kind = mutation_fragment_v2::kind;
+
+ BOOST_REQUIRE(validator(mf_kind::partition_start, {}));
+ BOOST_REQUIRE(validator(dk_.token()));
+ BOOST_REQUIRE(validator(mf_kind::static_row, position_in_partition_view(position_in_partition_view::static_row_tag_t{}), {}));
+ BOOST_REQUIRE(validator(mf_kind::clustering_row, {}));
+ BOOST_REQUIRE(validator(mf_kind::clustering_row, {}));
+ BOOST_REQUIRE(validator(mf_kind::clustering_row, position_in_partition_view::for_key(ck0), {}));
+ BOOST_REQUIRE(validator(mf_kind::clustering_row, {}));
+ BOOST_REQUIRE(!validator(mf_kind::clustering_row, position_in_partition_view::for_key(ck0), {}));
+ BOOST_REQUIRE(validator(mf_kind::clustering_row, {}));
+ BOOST_REQUIRE(validator(mf_kind::clustering_row, position_in_partition_view::for_key(ck1), {}));
+ BOOST_REQUIRE(validator(mf_kind::clustering_row, {}));
+ BOOST_REQUIRE(validator(mf_kind::range_tombstone_change, position_in_partition_view::after_key(ck1), {}));
+ BOOST_REQUIRE(validator(mf_kind::range_tombstone_change, position_in_partition_view::after_key(ck1), {}));
+ BOOST_REQUIRE(validator(mf_kind::partition_end, {}));
+ BOOST_REQUIRE(validator(dk0));
+ BOOST_REQUIRE(!validator(dk0));
+}
diff --git a/test/lib/mutation_assertions.hh b/test/lib/mutation_assertions.hh
--- a/test/lib/mutation_assertions.hh
+++ b/test/lib/mutation_assertions.hh
@@ -205,27 +205,27 @@ public:

void consume_new_partition(const dht::decorated_key& dk) {
testlog.debug("consume new partition: {}", dk);
- BOOST_REQUIRE(_validator(mutation_fragment_v2::kind::partition_start, position_in_partition_view(position_in_partition_view::partition_start_tag_t{})));
+ BOOST_REQUIRE(_validator(mutation_fragment_v2::kind::partition_start, position_in_partition_view(position_in_partition_view::partition_start_tag_t{}), {}));
}
void consume(tombstone) { }
stop_iteration consume(static_row&& sr) {
testlog.debug("consume static_row");
- BOOST_REQUIRE(_validator(mutation_fragment_v2::kind::static_row, sr.position()));
+ BOOST_REQUIRE(_validator(mutation_fragment_v2::kind::static_row, sr.position(), {}));
return stop_iteration::no;
}
stop_iteration consume(clustering_row&& cr) {
testlog.debug("consume clustering_row: {}", cr.key());
- BOOST_REQUIRE(_validator(mutation_fragment_v2::kind::clustering_row, cr.position()));
+ BOOST_REQUIRE(_validator(mutation_fragment_v2::kind::clustering_row, cr.position(), {}));
return stop_iteration::no;
}
stop_iteration consume(range_tombstone_change&& rtc) {
testlog.debug("consume range_tombstone_change: {} {}", rtc.position(), rtc.tombstone());
- BOOST_REQUIRE(_validator(mutation_fragment_v2::kind::range_tombstone_change, rtc.position()));
+ BOOST_REQUIRE(_validator(mutation_fragment_v2::kind::range_tombstone_change, rtc.position(), rtc.tombstone()));
return stop_iteration::no;
}
stop_iteration consume_end_of_partition() {
testlog.debug("consume end of partition");
- BOOST_REQUIRE(_validator(mutation_fragment_v2::kind::partition_end, position_in_partition_view(position_in_partition_view::end_of_partition_tag_t{})));
+ BOOST_REQUIRE(_validator(mutation_fragment_v2::kind::partition_end, position_in_partition_view(position_in_partition_view::end_of_partition_tag_t{}), {}));
return stop_iteration::no;
}
void consume_end_of_stream() {

Commit Bot

<bot@cloudius-systems.com>
unread,
Oct 3, 2022, 7:26:51 PM10/3/22
to scylladb-dev@googlegroups.com, Tomasz Grabiec
From: Tomasz Grabiec <tgra...@scylladb.com>
Committer: Tomasz Grabiec <tgra...@scylladb.com>
Branch: master
Reply all
Reply to author
Forward
0 new messages