[QUEUED scylladb next] mutation_fragment_stream_validator: make interface more robust

0 views
Skip to first unread message

Commit Bot

<bot@cloudius-systems.com>
unread,
Oct 3, 2022, 10:24:11 AM10/3/22
to scylladb-dev@googlegroups.com, Botond Dénes
From: Botond Dénes <bde...@scylladb.com>
Committer: Botond Dénes <bde...@scylladb.com>
Branch: next

mutation_fragment_stream_validator: make interface more robust

The validator has several API families with increasing amount of detail.
E.g. there is an `operator()(mutation_fragment_v2::kind)` and an
overload also taking a position. These different API families
currently cannot be mixed. If one uses one overload-set, one has to
stick with it, not doing so will generate false-positive failures.
This is hard to explain in documentation to users (provided they even
read it). Instead, just make the validator robust enough such that the
different API subsets can be mixed in any order. The validator will try
to make most of the situation and validate as much as possible.
Behind the scenes all the different validation methods are consolidated
into just two: one for the partition level, the other for the
intra-partition level. All the different overloads just call these
methods passing as much information as they have.
A test is also added to make sure this works.

---
diff --git a/mutation_fragment_stream_validator.hh b/mutation_fragment_stream_validator.hh
--- a/mutation_fragment_stream_validator.hh
+++ b/mutation_fragment_stream_validator.hh
@@ -20,15 +20,18 @@ enum class mutation_fragment_stream_validation_level {
/// Low level fragment stream validator.
///
/// Tracks and validates the monotonicity of the passed in fragment kinds,
-/// position in partition, token or partition keys. Any subset of these
-/// can be used, but what is used have to be consistent across the entire
-/// stream.
+/// position in partition, token or partition keys.
class mutation_fragment_stream_validator {
const ::schema& _schema;
mutation_fragment_v2::kind _prev_kind;
position_in_partition _prev_pos;
dht::decorated_key _prev_partition_key;
tombstone _current_tombstone;
+
+private:
+ bool validate(dht::token t, const partition_key* pkey);
+ bool validate(mutation_fragment_v2::kind kind, std::optional<position_in_partition_view> pos,
+ std::optional<tombstone> new_current_tombstone);
public:
explicit mutation_fragment_stream_validator(const schema& s);

@@ -124,26 +127,27 @@ public:
}
/// The previous valid position.
///
- /// Not meaningful, when operator()(position_in_partition_view) is not used.
+ /// Call only if operator()(position_in_partition_view) was used.
const position_in_partition& previous_position() const {
return _prev_pos;
}
/// Get the current effective tombstone
///
- /// Not meaningful, when operator()(mutation_fragment_v2) is not used.
+ /// Call only if operator()(mutation_fragment_v2) or
+ /// operator()(mutation_fragment_v2::kind, position_in_partition_view, std::optional<tombstone>)
+ /// was not used.
tombstone current_tombstone() const {
return _current_tombstone;
}
/// The previous valid partition key.
///
- /// Only valid if `operator()(const dht::decorated_key&)` or
- /// `operator()(dht::token)` was used.
+ /// Call only if operator()(dht::token) or operator()(const dht::decorated_key&) was used.
dht::token previous_token() const {
return _prev_partition_key.token();
}
/// The previous valid partition key.
///
- /// Only valid if `operator()(const dht::decorated_key&)` was used.
+ /// Call only if operator()(const dht::decorated_key&) was used.
const dht::decorated_key& previous_partition_key() const {
return _prev_partition_key;
}
diff --git a/readers/mutation_reader.cc b/readers/mutation_reader.cc
--- a/readers/mutation_reader.cc
+++ b/readers/mutation_reader.cc
@@ -41,65 +41,46 @@ mutation_fragment_stream_validator::mutation_fragment_stream_validator(const ::s
, _prev_partition_key(dht::minimum_token(), partition_key::make_empty()) {
}

-bool mutation_fragment_stream_validator::operator()(const dht::decorated_key& dk) {
- if (_prev_partition_key.less_compare(_schema, dk)) {
- _prev_partition_key = dk;
- return true;
+bool mutation_fragment_stream_validator::validate(dht::token t, const partition_key* pkey) {
+ if (_prev_partition_key.token() > t) {
+ return false;
+ }
+ partition_key::tri_compare cmp(_schema);
+ if (_prev_partition_key.token() == t && pkey && cmp(_prev_partition_key.key(), *pkey) >= 0) {
+ return false;
+ }
+ _prev_partition_key._token = t;
+ if (pkey) {
+ _prev_partition_key._key = *pkey;
+ } else {
+ // If new partition-key is not supplied, we reset it to empty one, which
+ // will compare less than any other key, making sure we don't attempt to
+ // compare partition-keys belonging to different tokens.
+ if (!_prev_partition_key.key().is_empty()) {
+ _prev_partition_key._key = partition_key::make_empty();
+ }
}
- return false;
+ return true;
+}
+
+bool mutation_fragment_stream_validator::operator()(const dht::decorated_key& dk) {
+ return validate(dk.token(), &dk.key());
}

bool mutation_fragment_stream_validator::operator()(dht::token t) {
- if (_prev_partition_key.token() <= t) {
- _prev_partition_key._token = t;
- return true;
- }
- return false;
+ return validate(t, nullptr);
}

-bool mutation_fragment_stream_validator::operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos,
- std::optional<tombstone> new_current_tombstone) {
+bool mutation_fragment_stream_validator::validate(mutation_fragment_v2::kind kind, std::optional<position_in_partition_view> pos,
+ std::optional<tombstone> new_current_tombstone) {
+ // Check for unclosed range tombstone on partition end
if (kind == mutation_fragment_v2::kind::partition_end && _current_tombstone) {
return false;
}
- if (_prev_kind == mutation_fragment_v2::kind::partition_end) {
- const bool valid = (kind == mutation_fragment_v2::kind::partition_start);
- if (valid) {
- _prev_kind = mutation_fragment_v2::kind::partition_start;
- _prev_pos = pos;
- _current_tombstone = new_current_tombstone.value_or(_current_tombstone);
- }
- return valid;
- }
- auto cmp = position_in_partition::tri_compare(_schema);
- auto res = cmp(_prev_pos, pos);
- bool valid = true;
- if (_prev_kind == mutation_fragment_v2::kind::range_tombstone_change) {
- valid = res <= 0;
- } else {
- valid = res < 0;
- }
- if (valid) {
- _prev_kind = kind;
- _prev_pos = pos;
- _current_tombstone = new_current_tombstone.value_or(_current_tombstone);
- }
- return valid;
-}
-bool mutation_fragment_stream_validator::operator()(mutation_fragment::kind kind, position_in_partition_view pos) {
- return (*this)(to_mutation_fragment_kind_v2(kind), pos, {});
-}

-bool mutation_fragment_stream_validator::operator()(const mutation_fragment_v2& mf) {
- return (*this)(mf.mutation_fragment_kind(), mf.position(),
- mf.is_range_tombstone_change() ? std::optional(mf.as_range_tombstone_change().tombstone()) : std::nullopt);
-}
-bool mutation_fragment_stream_validator::operator()(const mutation_fragment& mf) {
- return (*this)(to_mutation_fragment_kind_v2(mf.mutation_fragment_kind()), mf.position(), {});
-}
+ auto valid = true;

-bool mutation_fragment_stream_validator::operator()(mutation_fragment_v2::kind kind, std::optional<tombstone> new_current_tombstone) {
- bool valid = true;
+ // Check fragment kind order
switch (_prev_kind) {
case mutation_fragment_v2::kind::partition_start:
valid = kind != mutation_fragment_v2::kind::partition_start;
@@ -114,25 +95,81 @@ bool mutation_fragment_stream_validator::operator()(mutation_fragment_v2::kind k
valid = kind == mutation_fragment_v2::kind::partition_start;
break;
}
- if (kind == mutation_fragment_v2::kind::partition_end) {
- valid &= !_current_tombstone;
+ if (!valid) {
+ return false;
+ }
+
+ if (pos && _prev_kind != mutation_fragment_v2::kind::partition_end) {
+ auto cmp = position_in_partition::tri_compare(_schema);
+ auto res = cmp(_prev_pos, *pos);
+ if (_prev_kind == mutation_fragment_v2::kind::range_tombstone_change) {
+ valid = res <= 0;
+ } else {
+ valid = res < 0;
+ }
+ if (!valid) {
+ return false;
+ }
+ }
+
+ _prev_kind = kind;
+ if (pos) {
+ _prev_pos = *pos;
+ } else {
+ switch (kind) {
+ case mutation_fragment_v2::kind::partition_start:
+ _prev_pos = position_in_partition(position_in_partition::partition_start_tag_t{});
+ break;
+ case mutation_fragment_v2::kind::static_row:
+ _prev_pos = position_in_partition(position_in_partition::static_row_tag_t{});
+ break;
+ case mutation_fragment_v2::kind::clustering_row:
+ [[fallthrough]];
+ case mutation_fragment_v2::kind::range_tombstone_change:
+ if (_prev_pos.region() != partition_region::clustered) { // don't move pos if it is already a clustering one
+ _prev_pos = position_in_partition(position_in_partition::before_clustering_row_tag_t{}, clustering_key::make_empty());
+ }
+ break;
+ case mutation_fragment_v2::kind::partition_end:
+ _prev_pos = position_in_partition(position_in_partition::end_of_partition_tag_t{});
+ break;
+ }
}
- if (valid) {
- _prev_kind = kind;
- _current_tombstone = new_current_tombstone.value_or(_current_tombstone);
+ if (new_current_tombstone) {
+ _current_tombstone = *new_current_tombstone;
}
- return valid;
+ return true;
+}
+
+bool mutation_fragment_stream_validator::operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos,
+ std::optional<tombstone> new_current_tombstone) {
+ return validate(kind, pos, new_current_tombstone);
+}
+bool mutation_fragment_stream_validator::operator()(mutation_fragment::kind kind, position_in_partition_view pos) {
+ return validate(to_mutation_fragment_kind_v2(kind), pos, {});
+}
+
+bool mutation_fragment_stream_validator::operator()(const mutation_fragment_v2& mf) {
+ return validate(mf.mutation_fragment_kind(), mf.position(),
+ mf.is_range_tombstone_change() ? std::optional(mf.as_range_tombstone_change().tombstone()) : std::nullopt);
+}
+bool mutation_fragment_stream_validator::operator()(const mutation_fragment& mf) {
+ return validate(to_mutation_fragment_kind_v2(mf.mutation_fragment_kind()), mf.position(), {});
+}
+
+bool mutation_fragment_stream_validator::operator()(mutation_fragment_v2::kind kind, std::optional<tombstone> new_current_tombstone) {
+ return validate(kind, {}, new_current_tombstone);
}
bool mutation_fragment_stream_validator::operator()(mutation_fragment::kind kind) {
- return (*this)(to_mutation_fragment_kind_v2(kind), {});
+ return validate(to_mutation_fragment_kind_v2(kind), {}, {});
}

bool mutation_fragment_stream_validator::on_end_of_stream() {
return _prev_kind == mutation_fragment_v2::kind::partition_end;
}

void mutation_fragment_stream_validator::reset(dht::decorated_key dk) {
- _prev_partition_key = dk;
+ _prev_partition_key = std::move(dk);
_prev_pos = position_in_partition::for_partition_start();
_prev_kind = mutation_fragment_v2::kind::partition_start;
_current_tombstone = {};
diff --git a/test/boost/mutation_fragment_test.cc b/test/boost/mutation_fragment_test.cc
--- a/test/boost/mutation_fragment_test.cc
+++ b/test/boost/mutation_fragment_test.cc
@@ -605,3 +605,41 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_stream_validator) {
check_invalid_after(rtc, {&ps, &sr});
check_invalid_after(pe, {&sr, &cr, &rtc, &pe});
}
+
+SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_stream_validator_mixed_api_usage) {
+ simple_schema ss;
+
+ const auto dkeys = ss.make_pkeys(3);
+ const auto& dk_ = dkeys[0];
+ const auto& dk0 = dkeys[1];
+ const auto& dk1 = dkeys[2];
+ const auto ck0 = ss.make_ckey(0);
+ const auto ck1 = ss.make_ckey(1);
+ const auto ck2 = ss.make_ckey(2);
+ const auto ck3 = ss.make_ckey(3);
+
+ reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100);
+ auto stop_sem = deferred_stop(sem);
+ auto permit = sem.make_tracking_only_permit(ss.schema().get(), get_name(), db::no_timeout);
+
+ mutation_fragment_stream_validator validator(*ss.schema());
+
+ using mf_kind = mutation_fragment_v2::kind;
+
+ BOOST_REQUIRE(validator(mf_kind::partition_start, {}));
+ BOOST_REQUIRE(validator(dk_.token()));
+ BOOST_REQUIRE(validator(mf_kind::static_row, position_in_partition_view(position_in_partition_view::static_row_tag_t{}), {}));
+ BOOST_REQUIRE(validator(mf_kind::clustering_row, {}));
+ BOOST_REQUIRE(validator(mf_kind::clustering_row, {}));
+ BOOST_REQUIRE(validator(mf_kind::clustering_row, position_in_partition_view::for_key(ck0), {}));
+ BOOST_REQUIRE(validator(mf_kind::clustering_row, {}));
+ BOOST_REQUIRE(!validator(mf_kind::clustering_row, position_in_partition_view::for_key(ck0), {}));
+ BOOST_REQUIRE(validator(mf_kind::clustering_row, {}));
+ BOOST_REQUIRE(validator(mf_kind::clustering_row, position_in_partition_view::for_key(ck1), {}));
+ BOOST_REQUIRE(validator(mf_kind::clustering_row, {}));
+ BOOST_REQUIRE(validator(mf_kind::range_tombstone_change, position_in_partition_view::after_key(ck1), {}));
+ BOOST_REQUIRE(validator(mf_kind::range_tombstone_change, position_in_partition_view::after_key(ck1), {}));
+ BOOST_REQUIRE(validator(mf_kind::partition_end, {}));
+ BOOST_REQUIRE(validator(dk0));
+ BOOST_REQUIRE(!validator(dk0));
+}

Commit Bot

<bot@cloudius-systems.com>
unread,
Oct 3, 2022, 7:26:50 PM10/3/22
to scylladb-dev@googlegroups.com, Botond Dénes
From: Botond Dénes <bde...@scylladb.com>
Committer: Botond Dénes <bde...@scylladb.com>
Branch: master
Reply all
Reply to author
Forward
0 new messages