[QUEUED scylladb next] mutation_fragment_stream_validator: add reset() to validating filter

0 views
Skip to first unread message

Commit Bot

<bot@cloudius-systems.com>
unread,
Oct 3, 2022, 10:24:09 AM10/3/22
to scylladb-dev@googlegroups.com, Botond Dénes
From: Botond Dénes <bde...@scylladb.com>
Committer: Botond Dénes <bde...@scylladb.com>
Branch: next

mutation_fragment_stream_validator: add reset() to validating filter

Allow the high level filtering validator to be reset() to a certain
position, so it can be used in situations where the consumption is not
continuous (fast-forwarding or paging).

---
diff --git a/mutation_fragment_stream_validator.hh b/mutation_fragment_stream_validator.hh
--- a/mutation_fragment_stream_validator.hh
+++ b/mutation_fragment_stream_validator.hh
@@ -108,6 +108,7 @@ public:
/// normally invalid and hence wouldn't advance the internal state. This
/// can be used by users that can correct such invalid streams and wish to
/// continue validating it.
+ void reset(mutation_fragment_v2::kind kind, position_in_partition_view pos, std::optional<tombstone> new_current_tombstone);
void reset(const mutation_fragment&);
void reset(const mutation_fragment_v2&);

@@ -177,6 +178,8 @@ public:
/// Equivalent to `operator()(mf.kind(), mf.position())`
bool operator()(const mutation_fragment_v2& mv);
bool operator()(const mutation_fragment& mv);
+ void reset(mutation_fragment_v2::kind kind, position_in_partition_view pos, std::optional<tombstone> new_current_tombstone);
+ void reset(const mutation_fragment_v2& mf);
/// Equivalent to `operator()(partition_end{})`
bool on_end_of_partition();
void on_end_of_stream();
diff --git a/readers/mutation_reader.cc b/readers/mutation_reader.cc
--- a/readers/mutation_reader.cc
+++ b/readers/mutation_reader.cc
@@ -6,6 +6,8 @@
* SPDX-License-Identifier: AGPL-3.0-or-later
*/

+#include <seastar/util/lazy.hh>
+
#include "readers/flat_mutation_reader_v2.hh"
#include "mutation_rebuilder.hh"
#include "mutation_fragment_stream_validator.hh"
@@ -136,18 +138,18 @@ void mutation_fragment_stream_validator::reset(dht::decorated_key dk) {
_current_tombstone = {};
}

-void mutation_fragment_stream_validator::reset(const mutation_fragment_v2& mf) {
- _prev_pos = mf.position();
- _prev_kind = mf.mutation_fragment_kind();
- if (mf.is_range_tombstone_change()) {
- _current_tombstone = mf.as_range_tombstone_change().tombstone();
- } else {
- _current_tombstone = {};
+void mutation_fragment_stream_validator::reset(mutation_fragment_v2::kind kind, position_in_partition_view pos, std::optional<tombstone> new_current_tombstone) {
+ _prev_pos = pos;
+ _prev_kind = kind;
+ if (new_current_tombstone) {
+ _current_tombstone = *new_current_tombstone;
}
}
+void mutation_fragment_stream_validator::reset(const mutation_fragment_v2& mf) {
+ reset(mf.mutation_fragment_kind(), mf.position(), mf.is_range_tombstone_change() ? std::optional(mf.as_range_tombstone_change().tombstone()) : std::nullopt);
+}
void mutation_fragment_stream_validator::reset(const mutation_fragment& mf) {
- _prev_pos = mf.position();
- _prev_kind = to_mutation_fragment_kind_v2(mf.mutation_fragment_kind());
+ reset(to_mutation_fragment_kind_v2(mf.mutation_fragment_kind()), mf.position(), std::nullopt);
}

namespace {
@@ -250,6 +252,26 @@ bool mutation_fragment_stream_validating_filter::operator()(const mutation_fragm
return (*this)(to_mutation_fragment_kind_v2(mv.mutation_fragment_kind()), mv.position(), {});
}

+void mutation_fragment_stream_validating_filter::reset(mutation_fragment_v2::kind kind, position_in_partition_view pos,
+ std::optional<tombstone> new_current_tombstone) {
+ mrlog.debug("[validator {}] reset to {} @ {}{}", static_cast<const void*>(this), kind, pos, value_of([t = new_current_tombstone] () -> sstring {
+ if (!t) {
+ return "";
+ }
+ return format(" (new tombstone: {})", *t);
+ }));
+ _validator.reset(kind, pos, new_current_tombstone);
+}
+void mutation_fragment_stream_validating_filter::reset(const mutation_fragment_v2& mf) {
+ mrlog.debug("[validator {}] reset to {} @ {}{}", static_cast<const void*>(this), mf.mutation_fragment_kind(), mf.position(), value_of([&mf] () -> sstring {
+ if (!mf.is_range_tombstone_change()) {
+ return "";
+ }
+ return format(" (new tombstone: {})", mf.as_range_tombstone_change().tombstone());
+ }));
+ _validator.reset(mf);
+}
+
bool mutation_fragment_stream_validating_filter::on_end_of_partition() {
return (*this)(mutation_fragment::kind::partition_end, position_in_partition_view(position_in_partition_view::end_of_partition_tag_t()));
}

Commit Bot

<bot@cloudius-systems.com>
unread,
Oct 3, 2022, 7:26:48 PM10/3/22
to scylladb-dev@googlegroups.com, Botond Dénes
From: Botond Dénes <bde...@scylladb.com>
Committer: Botond Dénes <bde...@scylladb.com>
Branch: master
Reply all
Reply to author
Forward
0 new messages