[PATCH v1 00/15] Use xxHash for digest instead of MD5

257 views
Skip to first unread message

Duarte Nunes

<duarte@scylladb.com>
unread,
Nov 30, 2017, 6:00:42 PM11/30/17
to scylladb-dev@googlegroups.com
The MD5 hash function has proved to be slow for large cell values:

size = 256; elapsed = 4us
size = 512; elapsed = 8us
size = 1024; elapsed = 14us
size = 2048; elapsed = 21us
size = 4096; elapsed = 33us
size = 8192; elapsed = 51us
size = 16384; elapsed = 86us
size = 32768; elapsed = 150us
size = 65536; elapsed = 278us
size = 131072; elapsed = 531us
size = 262144; elapsed = 1032us
size = 524288; elapsed = 2026us
size = 1048576; elapsed = 4004us
size = 2097152; elapsed = 7943us
size = 4194304; elapsed = 15800us
size = 8388608; elapsed = 31731us
size = 16777216; elapsed = 64681us
size = 33554432; elapsed = 130752us
size = 67108864; elapsed = 263154us

The xxHash is a non-cryptographic, 64bit (there's work in progress on
the 128 version) hash that can be used to replace MD5. It performs much
better:

size = 256; elapsed = 4us
size = 512; elapsed = 8us
size = 1024; elapsed = 14us
size = 2048; elapsed = 21us
size = 4096; elapsed = 33us
size = 8192; elapsed = 51us
size = 16384; elapsed = 86us
size = 32768; elapsed = 150us
size = 65536; elapsed = 278us
size = 131072; elapsed = 531us
size = 262144; elapsed = 1032us
size = 524288; elapsed = 2026us
size = 1048576; elapsed = 4004us
size = 2097152; elapsed = 7943us
size = 4194304; elapsed = 15800us
size = 8388608; elapsed = 31731us
size = 16777216; elapsed = 64681us
size = 33554432; elapsed = 130752us
size = 67108864; elapsed = 263154us

Using a normal cassandra-stress run, one can already see a (small)
improvement. With MD5:

Results:
op rate : 16007 [READ:16007]
latency mean : 6.2 [READ:6.2]
latency median : 5.3 [READ:5.3]
latency 95th percentile : 11.6 [READ:11.6]
latency 99th percentile : 13.7 [READ:13.7]
latency 99.9th percentile : 16.0 [READ:16.0]
latency max : 27.5 [READ:27.5]
Total partitions : 10000000 [READ:10000000]
Total operation time : 00:10:24

With xxHash:

Results:
op rate : 17643 [READ:17643]
latency mean : 5.7 [READ:5.7]
latency median : 5.0 [READ:5.0]
latency 95th percentile : 10.0 [READ:10.0]
latency 99th percentile : 12.0 [READ:12.0]
latency 99.9th percentile : 14.1 [READ:14.1]
latency max : 26.5 [READ:26.5]
Total partitions : 10000000 [READ:10000000]
Total operation time : 00:09:26

Fixes #2884

Also in:
Also in:
g...@github.com:duarten/scylla.git xxhash/v1
https://github.com/duarten/scylla/tree/xxhash/v1

Sad note: I wish I knew how to configure cassandra-stress to use larger
values.

Duarte Nunes (15):
Add xxhash (fast non-cryptographic hash) as submodule
configure.py: Build xxhash
CMakeLists: Add xxhash directory
digest: Introduce xxHash hash algorithm
digest_algorithm: Add xxHash option
md5_hasher: Extract hash size
query: Add class to encapsulate digest algorithm
query-result: Introduce class digester
query-result: Use digester instead of md5_hasher
storage_proxy: Extract decision about digest algorithm to use
message/messaging_service: Specify algorithm when requesting digest
service/storage_service: Add and use xxhash feature
schema: Remove unneeded include
tests/mutation_test: Test xx_hasher alongside md5_hasher
tests/mutation_test: Use xxHash instead of MD5 for some tests

configure.py | 48 +++++++++---
database.hh | 5 +-
digest_algorithm.hh | 5 +-
digester.hh | 173 +++++++++++++++++++++++++++++++++++++++++++
idl/query.idl.hh | 3 +-
md5_hasher.hh | 8 +-
message/messaging_service.hh | 4 +-
mutation.hh | 4 +-
mutation_query.hh | 2 +-
query-result-writer.hh | 17 +++--
query-result.hh | 18 ++++-
service/storage_proxy.hh | 3 +-
service/storage_service.hh | 6 ++
xx_hasher.hh | 63 ++++++++++++++++
database.cc | 16 ++--
message/messaging_service.cc | 6 +-
mutation.cc | 8 +-
mutation_partition.cc | 2 +-
mutation_query.cc | 4 +-
query-result-set.cc | 2 +-
schema.cc | 1 -
service/storage_proxy.cc | 56 +++++++-------
service/storage_service.cc | 3 +
tests/database_test.cc | 6 +-
tests/memory_footprint.cc | 2 +-
tests/mutation_test.cc | 44 ++++++-----
.gitmodules | 3 +
CMakeLists.txt | 1 +
xxHash | 1 +
29 files changed, 410 insertions(+), 104 deletions(-)
create mode 100644 digester.hh
create mode 100644 xx_hasher.hh
create mode 160000 xxHash

--
2.15.0

Duarte Nunes

<duarte@scylladb.com>
unread,
Nov 30, 2017, 6:00:43 PM11/30/17
to scylladb-dev@googlegroups.com
Signed-off-by: Duarte Nunes <dua...@scylladb.com>
---
.gitmodules | 3 +++
xxHash | 1 +
2 files changed, 4 insertions(+)
create mode 160000 xxHash

diff --git a/.gitmodules b/.gitmodules
index 18e14ee560..6af0a36401 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -9,3 +9,6 @@
[submodule "dist/ami/files/scylla-ami"]
path = dist/ami/files/scylla-ami
url = ../scylla-ami
+[submodule "xxHash"]
+ path = xxHash
+ url = g...@github.com:Cyan4973/xxHash.git
diff --git a/xxHash b/xxHash
new file mode 160000
index 0000000000..bc32865a23
--- /dev/null
+++ b/xxHash
@@ -0,0 +1 @@
+Subproject commit bc32865a237f15cd4aaaa010b4f6a4f2fae71367
--
2.15.0

Duarte Nunes

<duarte@scylladb.com>
unread,
Nov 30, 2017, 6:00:45 PM11/30/17
to scylladb-dev@googlegroups.com
Signed-off-by: Duarte Nunes <dua...@scylladb.com>
---
configure.py | 48 +++++++++++++++++++++++++++++++++++++++---------
1 file changed, 39 insertions(+), 9 deletions(-)

diff --git a/configure.py b/configure.py
index 5744fd5ee1..e6cb6f6397 100755
--- a/configure.py
+++ b/configure.py
@@ -25,6 +25,8 @@ from distutils.spawn import find_executable

configure_args = str.join(' ', [shlex.quote(x) for x in sys.argv[1:]])

+srcdir = os.getcwd()
+
for line in open('/etc/os-release'):
key, _, value = line.partition('=')
value = value.strip().strip('"')
@@ -156,12 +158,14 @@ modes = {
'sanitize_libs': '-lasan -lubsan',
'opt': '-O0 -DDEBUG -DDEBUG_SHARED_PTR -DDEFAULT_ALLOCATOR',
'libs': '',
+ 'xxhash_opts': '-DBUILD_SHARED_LIBS=OFF -DBUILD_XXHSUM=ON',
},
'release': {
'sanitize': '',
'sanitize_libs': '',
'opt': '-O2',
'libs': '',
+ 'xxhash_opts': '-DBUILD_SHARED_LIBS=OFF -DBUILD_XXHSUM=ON',
},
}

@@ -846,6 +850,22 @@ libs = ' '.join(['-lyaml-cpp', '-llz4', '-lz', '-lsnappy', pkg_config("--libs",
maybe_static(args.staticboost, '-lboost_date_time'),
])

+# xxhash lib
+xxhash_dir = 'xxHash'
+xxhash_lib = 'xxhash-scylla'
+xxhash_src_lib = xxhash_dir + '/libxxhash.a'
+
+if not os.path.exists(xxhash_dir) or not os.listdir(xxhash_dir):
+ raise Exception(xxhash_dir + ' is empty. Run "git submodule update --init".')
+
+xxhash_sources = []
+for root, dirs, files in os.walk(xxhash_dir):
+ xxhash_sources += [os.path.join(root, file)
+ for file in files
+ if file.endswith('.h') or file.endswith('.c')]
+xxhash_sources = ' '.join(xxhash_sources)
+libs += ' -l' + xxhash_lib
+
if not args.staticboost:
args.user_cflags += ' -DBOOST_TEST_DYN_LINK'

@@ -903,17 +923,17 @@ with open(buildfile, 'w') as f:
for mode in build_modes:
modeval = modes[mode]
f.write(textwrap.dedent('''\
- cxxflags_{mode} = -I. -I $builddir/{mode}/gen -I seastar -I seastar/build/{mode}/gen
+ cxxflags_{mode} = -I. -I $builddir/{mode}/gen -I seastar -I seastar/build/{mode}/gen -I$full_builddir/{mode}/xxhash
rule cxx.{mode}
command = $cxx -MD -MT $out -MF $out.d {seastar_cflags} $cxxflags $cxxflags_{mode} $obj_cxxflags -c -o $out $in
description = CXX $out
depfile = $out.d
rule link.{mode}
- command = $cxx $cxxflags_{mode} {sanitize_libs} $ldflags {seastar_libs} -o $out $in $libs $libs_{mode}
+ command = $cxx $cxxflags_{mode} {sanitize_libs} -L$builddir/{mode} $ldflags {seastar_libs} -o $out $in $libs $libs_{mode}
description = LINK $out
pool = link_pool
rule link_stripped.{mode}
- command = $cxx $cxxflags_{mode} -s {sanitize_libs} $ldflags {seastar_libs} -o $out $in $libs $libs_{mode}
+ command = $cxx $cxxflags_{mode} -s {sanitize_libs} -L$builddir/{mode} $ldflags {seastar_libs} -o $out $in $libs $libs_{mode}
description = LINK (stripped) $out
pool = link_pool
rule ar.{mode}
@@ -936,6 +956,15 @@ with open(buildfile, 'w') as f:
build/{mode}/gen/${{stem}}Parser.cpp
description = ANTLR3 $in
''').format(mode = mode, **modeval))
+ f.write(textwrap.dedent('''\
+ rule xxhashmake_{mode}
+ command = make -C $builddir/{mode}/{xxhash_dir} CC={args.cc}
+ rule xxhashcmake_{mode}
+ command = mkdir -p $builddir/{mode}/{xxhash_dir} && cd $builddir/{mode}/{xxhash_dir} && CC={args.cc} cmake {xxhash_opts} {srcdir}/$in/cmake_unofficial
+ build $builddir/{mode}/{xxhash_dir}/Makefile : xxhashcmake_{mode} {xxhash_dir}
+ build $builddir/{mode}/{xxhash_src_lib} : xxhashmake_{mode} $builddir/{mode}/{xxhash_dir}/Makefile | {xxhash_sources}
+ build $builddir/{mode}/lib{xxhash_lib}.a : copy $builddir/{mode}/{xxhash_src_lib}
+ ''').format(xxhash_opts=(modeval['xxhash_opts']), **globals()))
f.write('build {mode}: phony {artifacts}\n'.format(mode = mode,
artifacts = str.join(' ', ('$builddir/' + mode + '/' + x for x in build_artifacts))))
compiles = {}
@@ -961,6 +990,10 @@ with open(buildfile, 'w') as f:
if binary.endswith('.a'):
f.write('build $builddir/{}/{}: ar.{} {}\n'.format(mode, binary, mode, str.join(' ', objs)))
else:
+ libdeps = str.join(' ', [
+ 'seastar/build/{mode}/libseastar.a'.format(mode = mode),
+ '$builddir/{mode}/lib{xxhash_lib}.a'.format(**globals()),
+ ])
if binary.startswith('tests/'):
local_libs = '$libs'
if binary not in tests_not_using_seastar_test_framework or binary in pure_boost_tests:
@@ -972,15 +1005,12 @@ with open(buildfile, 'w') as f:
# So we strip the tests by default; The user can very
# quickly re-link the test unstripped by adding a "_g"
# to the test name, e.g., "ninja build/release/testname_g"
- f.write('build $builddir/{}/{}: {}.{} {} {}\n'.format(mode, binary, tests_link_rule, mode, str.join(' ', objs),
- 'seastar/build/{}/libseastar.a'.format(mode)))
+ f.write('build $builddir/{}/{}: {}.{} {} | {}\n'.format(mode, binary, tests_link_rule, mode, str.join(' ', objs), libdeps))
f.write(' libs = {}\n'.format(local_libs))
- f.write('build $builddir/{}/{}_g: link.{} {} {}\n'.format(mode, binary, mode, str.join(' ', objs),
- 'seastar/build/{}/libseastar.a'.format(mode)))
+ f.write('build $builddir/{}/{}_g: link.{} {} | {}\n'.format(mode, binary, mode, str.join(' ', objs), libdeps))
f.write(' libs = {}\n'.format(local_libs))
else:
- f.write('build $builddir/{}/{}: link.{} {} {}\n'.format(mode, binary, mode, str.join(' ', objs),
- 'seastar/build/{}/libseastar.a'.format(mode)))
+ f.write('build $builddir/{}/{}: link.{} {} | {}\n'.format(mode, binary, mode, str.join(' ', objs), libdeps))
if has_thrift:
f.write(' libs = {} {} $libs\n'.format(thrift_libs, maybe_static(args.staticboost, '-lboost_system')))
for src in srcs:
--
2.15.0

Duarte Nunes

<duarte@scylladb.com>
unread,
Nov 30, 2017, 6:00:46 PM11/30/17
to scylladb-dev@googlegroups.com
Signed-off-by: Duarte Nunes <dua...@scylladb.com>
---
CMakeLists.txt | 1 +
1 file changed, 1 insertion(+)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 86711c8ecd..0687def076 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -137,4 +137,5 @@ target_include_directories(scylla PUBLIC
${SEASTAR_DPDK_INCLUDE_DIRS}
${SEASTAR_INCLUDE_DIRS}
${Boost_INCLUDE_DIRS}
+ xxhash
build/release/gen)
--
2.15.0

Duarte Nunes

<duarte@scylladb.com>
unread,
Nov 30, 2017, 6:00:47 PM11/30/17
to scylladb-dev@googlegroups.com
This patch introduces xx_hasher, a class conforming to the Hasher
concept, which will be used to calculate the data digest in subsequent
patches. It is expected to be an order of magnitude faster than md5.

We use the 64 bit variant of the algorithm, the 128 bit one still
being under development.

Signed-off-by: Duarte Nunes <dua...@scylladb.com>
---
xx_hasher.hh | 63 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 63 insertions(+)
create mode 100644 xx_hasher.hh

diff --git a/xx_hasher.hh b/xx_hasher.hh
new file mode 100644
index 0000000000..0080e796a1
--- /dev/null
+++ b/xx_hasher.hh
@@ -0,0 +1,63 @@
+/*
+ * Copyright (C) 2017 ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "bytes.hh"
+#include <xxHash/xxhash.h>
+#include <memory>
+
+class xx_hasher {
+ struct xxhash_state_deleter {
+ void operator()(XXH64_state_t* x) const { XXH64_freeState(x); }
+ };
+
+ std::unique_ptr<XXH64_state_t, xxhash_state_deleter> _state;
+public:
+ xx_hasher() : _state(std::unique_ptr<XXH64_state_t, xxhash_state_deleter>(XXH64_createState())) {
+ XXH64_reset(_state.get(), 0);
+ }
+
+ xx_hasher(xx_hasher&&) noexcept = default;
+
+ xx_hasher& operator=(xx_hasher&&) noexcept = default;
+
+ xx_hasher(const xx_hasher& other) : xx_hasher() {
+ XXH64_copyState(_state.get(), other._state.get());
+ }
+
+ xx_hasher& operator=(const xx_hasher& other) {
+ if (this != &other) {
+ auto tmp = other;
+ this->~xx_hasher();
+ new (this) xx_hasher(std::move(tmp));
+ }
+ return *this;
+ }
+
+ void update(const char* ptr, size_t length) {
+ XXH64_update(_state.get(), ptr, length);
+ }
+
+ uint64_t finalize() {
+ return XXH64_digest(_state.get());
+ }
+};
--
2.15.0

Duarte Nunes

<duarte@scylladb.com>
unread,
Nov 30, 2017, 6:00:48 PM11/30/17
to scylladb-dev@googlegroups.com
Signed-off-by: Duarte Nunes <dua...@scylladb.com>
---
digest_algorithm.hh | 1 +
service/storage_proxy.cc | 2 +-
2 files changed, 2 insertions(+), 1 deletion(-)

diff --git a/digest_algorithm.hh b/digest_algorithm.hh
index 966f18da43..26e31b0c11 100644
--- a/digest_algorithm.hh
+++ b/digest_algorithm.hh
@@ -26,6 +26,7 @@ namespace query {
enum class digest_algorithm : uint8_t {
none = 0, // digest not required
MD5 = 1, // default algorithm
+ xxHash = 2,
};

}
diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc
index d392048c2e..0caa1b4134 100644
--- a/service/storage_proxy.cc
+++ b/service/storage_proxy.cc
@@ -3757,7 +3757,7 @@ void storage_proxy::init_messaging_service() {
case query::digest_algorithm::none:
qrr = query::result_request::only_result;
break;
- case query::digest_algorithm::MD5:
+ default:
qrr = query::result_request::result_and_digest;
break;
}
--
2.15.0

Duarte Nunes

<duarte@scylladb.com>
unread,
Nov 30, 2017, 6:00:49 PM11/30/17
to scylladb-dev@googlegroups.com
Signed-off-by: Duarte Nunes <dua...@scylladb.com>
---
md5_hasher.hh | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/md5_hasher.hh b/md5_hasher.hh
index 0de64e5b33..379aff14ac 100644
--- a/md5_hasher.hh
+++ b/md5_hasher.hh
@@ -30,19 +30,21 @@
class md5_hasher {
CryptoPP::Weak::MD5 hash{};
public:
+ static constexpr size_t size = CryptoPP::Weak::MD5::DIGESTSIZE;
+
void update(const char* ptr, size_t length) {
static_assert(sizeof(char) == sizeof(byte), "Assuming lengths will be the same");
hash.Update(reinterpret_cast<const byte*>(ptr), length * sizeof(byte));
}

bytes finalize() {
- bytes digest{bytes::initialized_later(), CryptoPP::Weak::MD5::DIGESTSIZE};
+ bytes digest{bytes::initialized_later(), size};
hash.Final(reinterpret_cast<unsigned char*>(digest.begin()));
return digest;
}

- std::array<uint8_t, CryptoPP::Weak::MD5::DIGESTSIZE> finalize_array() {
- std::array<uint8_t, CryptoPP::Weak::MD5::DIGESTSIZE> array;
+ std::array<uint8_t, size> finalize_array() {
+ std::array<uint8_t, size> array;
hash.Final(reinterpret_cast<unsigned char*>(array.data()));
return array;
}
--
2.15.0

Duarte Nunes

<duarte@scylladb.com>
unread,
Nov 30, 2017, 6:00:50 PM11/30/17
to scylladb-dev@googlegroups.com
This patch paves the way for us to encapsulate the actual digest
algorithm used for a query. The digester class dispatches to a
concrete implementation based on the digest algorithm being used. It
wraps the xxHash algorithm to provide a 128 bit hash, which is the
size of digest expected by the inter-node protocol.

Signed-off-by: Duarte Nunes <dua...@scylladb.com>
---
digest_algorithm.hh | 2 +
digester.hh | 173 ++++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 175 insertions(+)
create mode 100644 digester.hh

diff --git a/digest_algorithm.hh b/digest_algorithm.hh
index 26e31b0c11..9a7a35cd7d 100644
--- a/digest_algorithm.hh
+++ b/digest_algorithm.hh
@@ -23,6 +23,8 @@

namespace query {

+constexpr size_t digest_size = 16;
+
enum class digest_algorithm : uint8_t {
none = 0, // digest not required
MD5 = 1, // default algorithm
diff --git a/digester.hh b/digester.hh
new file mode 100644
index 0000000000..651ece024f
--- /dev/null
+++ b/digester.hh
@@ -0,0 +1,173 @@
+/*
+ * Copyright (C) 2017 ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "digest_algorithm.hh"
+#include "md5_hasher.hh"
+#include "utils/serialization.hh"
+#include "xx_hasher.hh"
+
+namespace query {
+
+class digester final {
+ static_assert(md5_hasher::size == digest_size, "MD5 hash size needs to match the digest size");
+
+ struct xx_hasher_wrapper : public xx_hasher {
+ bytes finalize() {
+ bytes digest{bytes::initialized_later(), digest_size};
+ serialize_to(digest.begin());
+ return digest;
+ }
+
+ std::array<uint8_t, digest_size> finalize_array() {
+ std::array<uint8_t, digest_size> digest;
+ serialize_to(digest.begin());
+ return digest;
+ }
+
+ template<typename OutIterator>
+ void serialize_to(OutIterator&& out) {
+ serialize_int64(out, 0);
+ serialize_int64(out, xx_hasher::finalize());
+ }
+ };
+
+ union concrete_hasher {
+ md5_hasher md5;
+ xx_hasher_wrapper xx;
+
+ concrete_hasher() { }
+ ~concrete_hasher() { }
+ } _impl;
+ digest_algorithm _algo;
+public:
+ explicit digester(digest_algorithm algo)
+ : _algo(algo) {
+ switch (_algo) {
+ case digest_algorithm::MD5:
+ new (&_impl.md5) md5_hasher();
+ break;
+ case digest_algorithm::xxHash:
+ new (&_impl.xx) xx_hasher();
+ break;
+ case digest_algorithm ::none:
+ break;
+ }
+ }
+
+ ~digester() {
+ switch (_algo) {
+ case digest_algorithm::MD5:
+ _impl.md5.~md5_hasher();
+ break;
+ case digest_algorithm::xxHash:
+ _impl.xx.~xx_hasher();
+ break;
+ case digest_algorithm ::none:
+ break;
+ }
+ }
+
+ digester(digester&& other) noexcept
+ : _algo(other._algo) {
+ switch (_algo) {
+ case digest_algorithm::MD5:
+ new (&_impl.md5) md5_hasher(std::move(other._impl.md5));
+ break;
+ case digest_algorithm::xxHash:
+ new (&_impl.xx) xx_hasher(std::move(other._impl.xx));
+ break;
+ case digest_algorithm ::none:
+ break;
+ }
+ }
+
+ digester(const digester& other) noexcept
+ : _algo(other._algo) {
+ switch (_algo) {
+ case digest_algorithm::MD5:
+ new (&_impl.md5) md5_hasher(other._impl.md5);
+ break;
+ case digest_algorithm::xxHash:
+ new (&_impl.xx) xx_hasher(other._impl.xx);
+ break;
+ case digest_algorithm ::none:
+ break;
+ }
+ }
+
+ digester& operator=(digester&& other) noexcept {
+ if (this != &other) {
+ this->~digester();
+ new (this) digester(std::move(other));
+ }
+ return *this;
+ }
+
+ digester& operator=(const digester& other) noexcept {
+ if (this != &other) {
+ auto tmp = other;
+ this->~digester();
+ new (this) digester(std::move(tmp));
+ }
+ return *this;
+ }
+
+ void update(const char* ptr, size_t length) {
+ switch (_algo) {
+ case digest_algorithm::MD5:
+ _impl.md5.update(ptr, length);
+ break;
+ case digest_algorithm::xxHash:
+ _impl.xx.update(ptr, length);
+ break;
+ case digest_algorithm ::none:
+ break;
+ }
+ }
+
+ bytes finalize() {
+ switch (_algo) {
+ case digest_algorithm::MD5:
+ return _impl.md5.finalize();
+ case digest_algorithm::xxHash:
+ return _impl.xx.finalize();
+ case digest_algorithm ::none:
+ return bytes();
+ }
+ abort();
+ }
+
+ std::array<uint8_t, digest_size> finalize_array() {
+ switch (_algo) {
+ case digest_algorithm::MD5:
+ return _impl.md5.finalize_array();
+ case digest_algorithm::xxHash:
+ return _impl.xx.finalize_array();
+ case digest_algorithm ::none:
+ return { };
+ }
+ abort();
+ }
+};
+
+}
\ No newline at end of file
--
2.15.0

Duarte Nunes

<duarte@scylladb.com>
unread,
Nov 30, 2017, 6:00:51 PM11/30/17
to scylladb-dev@googlegroups.com
Use the digester class instead of md5_hasher to encapsulate the
decision of which hash algorithm to use.

Signed-off-by: Duarte Nunes <dua...@scylladb.com>
---
query-result-writer.hh | 13 +++++++------
query-result.hh | 4 ----
mutation_partition.cc | 2 +-
3 files changed, 8 insertions(+), 11 deletions(-)

diff --git a/query-result-writer.hh b/query-result-writer.hh
index afaed6de0e..c91119bd8d 100644
--- a/query-result-writer.hh
+++ b/query-result-writer.hh
@@ -26,7 +26,7 @@
#include "query-request.hh"
#include "query-result.hh"
#include "digest_algorithm.hh"
-
+#include "digester.hh"
#include "idl/uuid.dist.hh"
#include "idl/keys.dist.hh"
#include "idl/query.dist.hh"
@@ -48,8 +48,8 @@ class result::partition_writer {
const clustering_row_ranges& _ranges;
ser::query_result__partitions<bytes_ostream>& _pw;
ser::vector_position _pos;
- md5_hasher& _digest;
- md5_hasher _digest_pos;
+ digester& _digest;
+ digester _digest_pos;
uint32_t& _row_count;
uint32_t& _partition_count;
api::timestamp_type& _last_modified;
@@ -61,7 +61,7 @@ class result::partition_writer {
ser::query_result__partitions<bytes_ostream>& pw,
ser::vector_position pos,
ser::after_qr_partition__key<bytes_ostream> w,
- md5_hasher& digest,
+ digester& digest,
uint32_t& row_count,
uint32_t& partition_count,
api::timestamp_type& last_modified)
@@ -104,7 +104,7 @@ class result::partition_writer {
const partition_slice& slice() const {
return _slice;
}
- md5_hasher& digest() {
+ digester& digest() {
return _digest;
}
uint32_t& row_count() {
@@ -121,7 +121,6 @@ class result::partition_writer {

class result::builder {
bytes_ostream _out;
- md5_hasher _digest;
const partition_slice& _slice;
ser::query_result__partitions<bytes_ostream> _w;
result_request _request;
@@ -129,12 +128,14 @@ class result::builder {
uint32_t _partition_count = 0;
api::timestamp_type _last_modified = api::missing_timestamp;
short_read _short_read;
+ digester _digest;
result_memory_accounter _memory_accounter;
public:
builder(const partition_slice& slice, result_options options, result_memory_accounter memory_accounter)
: _slice(slice)
, _w(ser::writer_of_query_result<bytes_ostream>(_out).start_partitions())
, _request(options.request)
+ , _digest(digester(options.digest_algo))
, _memory_accounter(std::move(memory_accounter))
{ }
builder(builder&&) = delete; // _out is captured by reference
diff --git a/query-result.hh b/query-result.hh
index 21270f4e35..fd5ce64d63 100644
--- a/query-result.hh
+++ b/query-result.hh
@@ -21,12 +21,9 @@

#pragma once

-#define CRYPTOPP_ENABLE_NAMESPACE_WEAK 1
-#include <cryptopp/md5.h>
#include "bytes_ostream.hh"
#include "digest_algorithm.hh"
#include "query-request.hh"
-#include "md5_hasher.hh"
#include <experimental/optional>
#include <seastar/util/bool_class.hh>
#include "seastarx.hh"
@@ -284,7 +281,6 @@ struct result_options {

class result_digest {
public:
- static_assert(16 == CryptoPP::Weak::MD5::DIGESTSIZE, "MD5 digest size is all wrong");
using type = std::array<uint8_t, 16>;
private:
type _digest;
diff --git a/mutation_partition.cc b/mutation_partition.cc
index 3831d8c623..c15428dad4 100644
--- a/mutation_partition.cc
+++ b/mutation_partition.cc
@@ -606,7 +606,7 @@ void write_counter_cell(RowWriter& w, const query::partition_slice& slice, ::ato
}

// returns the timestamp of a latest update to the row
-static api::timestamp_type hash_row_slice(md5_hasher& hasher,
+static api::timestamp_type hash_row_slice(query::digester& hasher,
const schema& s,
column_kind kind,
const row& cells,
--
2.15.0

Duarte Nunes

<duarte@scylladb.com>
unread,
Nov 30, 2017, 6:00:51 PM11/30/17
to scylladb-dev@googlegroups.com
Introduce class result_options to carry result options through the
request pipeline, which at this point mean the result type and the
digest algorithm. This class allows us to encapsulate the concrete
digest algorithm to use.

Signed-off-by: Duarte Nunes <dua...@scylladb.com>
---
database.hh | 5 +++--
mutation.hh | 4 ++--
mutation_query.hh | 2 +-
query-result-writer.hh | 4 ++--
query-result.hh | 14 ++++++++++++++
service/storage_proxy.hh | 2 +-
database.cc | 16 ++++++++--------
mutation.cc | 8 ++++----
mutation_query.cc | 4 ++--
query-result-set.cc | 2 +-
service/storage_proxy.cc | 35 +++++++++++++++--------------------
tests/database_test.cc | 6 +++---
tests/memory_footprint.cc | 2 +-
tests/mutation_test.cc | 7 ++++---
14 files changed, 61 insertions(+), 50 deletions(-)

diff --git a/database.hh b/database.hh
index 61d4ee70cb..78d980bdc9 100644
--- a/database.hh
+++ b/database.hh
@@ -638,7 +638,8 @@ class column_family : public enable_lw_shared_from_this<column_family> {

// Returns at most "cmd.limit" rows
future<lw_shared_ptr<query::result>> query(schema_ptr,
- const query::read_command& cmd, query::result_request request,
+ const query::read_command& cmd,
+ query::result_options opts,
const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state,
query::result_memory_limiter& memory_limiter,
@@ -1170,7 +1171,7 @@ class database {
unsigned shard_of(const dht::token& t);
unsigned shard_of(const mutation& m);
unsigned shard_of(const frozen_mutation& m);
- future<lw_shared_ptr<query::result>, cache_temperature> query(schema_ptr, const query::read_command& cmd, query::result_request request, const dht::partition_range_vector& ranges,
+ future<lw_shared_ptr<query::result>, cache_temperature> query(schema_ptr, const query::read_command& cmd, query::result_options opts, const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state, uint64_t max_result_size);
future<reconcilable_result, cache_temperature> query_mutations(schema_ptr, const query::read_command& cmd, const dht::partition_range& range,
query::result_memory_accounter&& accounter, tracing::trace_state_ptr trace_state);
diff --git a/mutation.hh b/mutation.hh
index 318d15e82c..fdeb954b25 100644
--- a/mutation.hh
+++ b/mutation.hh
@@ -107,14 +107,14 @@ class mutation final {
public:
// The supplied partition_slice must be governed by this mutation's schema
query::result query(const query::partition_slice&,
- query::result_request request = query::result_request::only_result,
+ query::result_options opts = query::result_options::only_result(),
gc_clock::time_point now = gc_clock::now(),
uint32_t row_limit = query::max_rows) &&;

// The supplied partition_slice must be governed by this mutation's schema
// FIXME: Slower than the r-value version
query::result query(const query::partition_slice&,
- query::result_request request = query::result_request::only_result,
+ query::result_options opts = query::result_options::only_result(),
gc_clock::time_point now = gc_clock::now(),
uint32_t row_limit = query::max_rows) const&;

diff --git a/mutation_query.hh b/mutation_query.hh
index 6cfdd1dc80..5b53b60a3c 100644
--- a/mutation_query.hh
+++ b/mutation_query.hh
@@ -105,7 +105,7 @@ class reconcilable_result {
printer pretty_printer(schema_ptr) const;
};

-query::result to_data_query_result(const reconcilable_result&, schema_ptr, const query::partition_slice&, uint32_t row_limit, uint32_t partition_limit, query::result_request result_type = query::result_request::only_result);
+query::result to_data_query_result(const reconcilable_result&, schema_ptr, const query::partition_slice&, uint32_t row_limit, uint32_t partition_limit, query::result_options opts = query::result_options::only_result());

// Performs a query on given data source returning data in reconcilable form.
//
diff --git a/query-result-writer.hh b/query-result-writer.hh
index 664bf377e2..afaed6de0e 100644
--- a/query-result-writer.hh
+++ b/query-result-writer.hh
@@ -131,10 +131,10 @@ class result::builder {
short_read _short_read;
result_memory_accounter _memory_accounter;
public:
- builder(const partition_slice& slice, result_request request, result_memory_accounter memory_accounter)
+ builder(const partition_slice& slice, result_options options, result_memory_accounter memory_accounter)
: _slice(slice)
, _w(ser::writer_of_query_result<bytes_ostream>(_out).start_partitions())
- , _request(request)
+ , _request(options.request)
, _memory_accounter(std::move(memory_accounter))
{ }
builder(builder&&) = delete; // _out is captured by reference
diff --git a/query-result.hh b/query-result.hh
index 4e4681e86c..21270f4e35 100644
--- a/query-result.hh
+++ b/query-result.hh
@@ -24,6 +24,7 @@
#define CRYPTOPP_ENABLE_NAMESPACE_WEAK 1
#include <cryptopp/md5.h>
#include "bytes_ostream.hh"
+#include "digest_algorithm.hh"
#include "query-request.hh"
#include "md5_hasher.hh"
#include <experimental/optional>
@@ -268,6 +269,19 @@ enum class result_request {
result_and_digest,
};

+struct result_options {
+ result_request request = result_request::only_result;
+ digest_algorithm digest_algo = query::digest_algorithm::none;
+
+ static result_options only_result() {
+ return result_options{};
+ }
+
+ static result_options only_digest(digest_algorithm da) {
+ return {result_request::only_digest, da};
+ }
+};
+
class result_digest {
public:
static_assert(16 == CryptoPP::Weak::MD5::DIGESTSIZE, "MD5 digest size is all wrong");
diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh
index 0bb4f50332..ddc5f27926 100644
--- a/service/storage_proxy.hh
+++ b/service/storage_proxy.hh
@@ -248,7 +248,7 @@ class storage_proxy : public seastar::async_sharded_service<storage_proxy> /*imp
db::read_repair_decision new_read_repair_decision(const schema& s);
::shared_ptr<abstract_read_executor> get_read_executor(lw_shared_ptr<query::read_command> cmd, dht::partition_range pr, db::consistency_level cl, tracing::trace_state_ptr trace_state);
future<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature> query_result_local(schema_ptr, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr,
- query::result_request request,
+ query::result_options opts,
tracing::trace_state_ptr trace_state,
uint64_t max_size = query::result_memory_limiter::maximum_result_size);
future<query::result_digest, api::timestamp_type, cache_temperature> query_result_local_digest(schema_ptr, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr, tracing::trace_state_ptr trace_state,
diff --git a/database.cc b/database.cc
index f5e7bcde68..31db41a845 100644
--- a/database.cc
+++ b/database.cc
@@ -2876,12 +2876,12 @@ compare_atomic_cell_for_merge(atomic_cell_view left, atomic_cell_view right) {
struct query_state {
explicit query_state(schema_ptr s,
const query::read_command& cmd,
- query::result_request request,
+ query::result_options opts,
const dht::partition_range_vector& ranges,
query::result_memory_accounter memory_accounter = { })
: schema(std::move(s))
, cmd(cmd)
- , builder(cmd.slice, request, std::move(memory_accounter))
+ , builder(cmd.slice, opts, std::move(memory_accounter))
, limit(cmd.row_limit)
, partition_limit(cmd.partition_limit)
, current_partition_range(ranges.begin())
@@ -2908,16 +2908,16 @@ struct query_state {
};

future<lw_shared_ptr<query::result>>
-column_family::query(schema_ptr s, const query::read_command& cmd, query::result_request request,
+column_family::query(schema_ptr s, const query::read_command& cmd, query::result_options opts,
const dht::partition_range_vector& partition_ranges,
tracing::trace_state_ptr trace_state, query::result_memory_limiter& memory_limiter,
uint64_t max_size) {
utils::latency_counter lc;
_stats.reads.set_latency(lc);
- auto f = request == query::result_request::only_digest
+ auto f = opts.request == query::result_request::only_digest
? memory_limiter.new_digest_read(max_size) : memory_limiter.new_data_read(max_size);
- return f.then([this, lc, s = std::move(s), &cmd, request, &partition_ranges, trace_state = std::move(trace_state)] (query::result_memory_accounter accounter) mutable {
- auto qs_ptr = std::make_unique<query_state>(std::move(s), cmd, request, partition_ranges, std::move(accounter));
+ return f.then([this, lc, s = std::move(s), &cmd, opts, &partition_ranges, trace_state = std::move(trace_state)] (query::result_memory_accounter accounter) mutable {
+ auto qs_ptr = std::make_unique<query_state>(std::move(s), cmd, opts, partition_ranges, std::move(accounter));
auto& qs = *qs_ptr;
return do_until(std::bind(&query_state::done, &qs), [this, &qs, trace_state = std::move(trace_state)] {
auto&& range = *qs.current_partition_range++;
@@ -2965,10 +2965,10 @@ std::chrono::milliseconds column_family::get_coordinator_read_latency_percentile
static thread_local auto data_query_stage = seastar::make_execution_stage("data_query", &column_family::query);

future<lw_shared_ptr<query::result>, cache_temperature>
-database::query(schema_ptr s, const query::read_command& cmd, query::result_request request, const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state,
+database::query(schema_ptr s, const query::read_command& cmd, query::result_options opts, const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state,
uint64_t max_result_size) {
column_family& cf = find_column_family(cmd.cf_id);
- return data_query_stage(&cf, std::move(s), seastar::cref(cmd), request, seastar::cref(ranges),
+ return data_query_stage(&cf, std::move(s), seastar::cref(cmd), opts, seastar::cref(ranges),
std::move(trace_state), seastar::ref(get_result_memory_limiter()),
max_result_size).then_wrapped([this, s = _stats, hit_rate = cf.get_global_cache_hit_rate()] (auto f) {
if (f.failed()) {
diff --git a/mutation.cc b/mutation.cc
index 6f4c2028ba..8fd77f5699 100644
--- a/mutation.cc
+++ b/mutation.cc
@@ -120,20 +120,20 @@ mutation::query(query::result::builder& builder,

query::result
mutation::query(const query::partition_slice& slice,
- query::result_request request,
+ query::result_options opts,
gc_clock::time_point now, uint32_t row_limit) &&
{
- query::result::builder builder(slice, request, { });
+ query::result::builder builder(slice, opts, { });
std::move(*this).query(builder, slice, now, row_limit);
return builder.build();
}

query::result
mutation::query(const query::partition_slice& slice,
- query::result_request request,
+ query::result_options opts,
gc_clock::time_point now, uint32_t row_limit) const&
{
- return mutation(*this).query(slice, request, now, row_limit);
+ return mutation(*this).query(slice, opts, now, row_limit);
}

size_t
diff --git a/mutation_query.cc b/mutation_query.cc
index b501f9705c..0dfc128802 100644
--- a/mutation_query.cc
+++ b/mutation_query.cc
@@ -57,8 +57,8 @@ bool reconcilable_result::operator!=(const reconcilable_result& other) const {
}

query::result
-to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::partition_slice& slice, uint32_t max_rows, uint32_t max_partitions, query::result_request result_type) {
- query::result::builder builder(slice, result_type, { });
+to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::partition_slice& slice, uint32_t max_rows, uint32_t max_partitions, query::result_options opts) {
+ query::result::builder builder(slice, opts, { });
for (const partition& p : r.partitions()) {
if (builder.row_count() >= max_rows || builder.partition_count() >= max_partitions) {
break;
diff --git a/query-result-set.cc b/query-result-set.cc
index f849230d6d..2acca12cc0 100644
--- a/query-result-set.cc
+++ b/query-result-set.cc
@@ -193,7 +193,7 @@ result_set::from_raw_result(schema_ptr s, const partition_slice& slice, const re

result_set::result_set(const mutation& m) : result_set([&m] {
auto slice = partition_slice_builder(*m.schema()).build();
- auto qr = mutation(m).query(slice, result_request::only_result);
+ auto qr = mutation(m).query(slice, result_options::only_result());
return result_set::from_raw_result(m.schema(), slice, qr);
}())
{ }
diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc
index 0caa1b4134..c01434d715 100644
--- a/service/storage_proxy.cc
+++ b/service/storage_proxy.cc
@@ -2448,15 +2448,16 @@ class abstract_read_executor : public enable_shared_from_this<abstract_read_exec
}
future<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature> make_data_request(gms::inet_address ep, clock_type::time_point timeout, bool want_digest) {
++_proxy->_stats.data_read_attempts.get_ep_stat(ep);
+ auto opts = want_digest
+ ? query::result_options{query::result_request::result_and_digest, query::digest_algorithm::MD5}
+ : query::result_options{query::result_request::only_result, query::digest_algorithm::none};
if (is_me(ep)) {
tracing::trace(_trace_state, "read_data: querying locally");
- auto qrr = want_digest ? query::result_request::result_and_digest : query::result_request::only_result;
- return _proxy->query_result_local(_schema, _cmd, _partition_range, qrr, _trace_state);
+ return _proxy->query_result_local(_schema, _cmd, _partition_range, opts, _trace_state);
} else {
auto& ms = netw::get_local_messaging_service();
tracing::trace(_trace_state, "read_data: sending a message to /{}", ep);
- auto da = want_digest ? query::digest_algorithm::MD5 : query::digest_algorithm::none;
- return ms.send_read_data(netw::messaging_service::msg_addr{ep, 0}, timeout, *_cmd, _partition_range, da).then([this, ep](query::result&& result, rpc::optional<cache_temperature> hit_rate) {
+ return ms.send_read_data(netw::messaging_service::msg_addr{ep, 0}, timeout, *_cmd, _partition_range, opts.digest_algo).then([this, ep](query::result&& result, rpc::optional<cache_temperature> hit_rate) {
tracing::trace(_trace_state, "read_data: got response from /{}", ep);
return make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>(make_foreign(::make_lw_shared<query::result>(std::move(result))), hit_rate.value_or(cache_temperature::invalid()));
});
@@ -2845,26 +2846,26 @@ ::shared_ptr<abstract_read_executor> storage_proxy::get_read_executor(lw_shared_

future<query::result_digest, api::timestamp_type, cache_temperature>
storage_proxy::query_result_local_digest(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr, tracing::trace_state_ptr trace_state, uint64_t max_size) {
- return query_result_local(std::move(s), std::move(cmd), pr, query::result_request::only_digest, std::move(trace_state), max_size).then([] (foreign_ptr<lw_shared_ptr<query::result>> result, cache_temperature hit_rate) {
+ return query_result_local(std::move(s), std::move(cmd), pr, query::result_options::only_digest(query::digest_algorithm::MD5), std::move(trace_state), max_size).then([] (foreign_ptr<lw_shared_ptr<query::result>> result, cache_temperature hit_rate) {
return make_ready_future<query::result_digest, api::timestamp_type, cache_temperature>(*result->digest(), result->last_modified(), hit_rate);
});
}

future<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>
-storage_proxy::query_result_local(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr, query::result_request request, tracing::trace_state_ptr trace_state, uint64_t max_size) {
+storage_proxy::query_result_local(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr, query::result_options opts, tracing::trace_state_ptr trace_state, uint64_t max_size) {
if (pr.is_singular()) {
unsigned shard = _db.local().shard_of(pr.start()->value().token());
- return _db.invoke_on(shard, [max_size, gs = global_schema_ptr(s), prv = dht::partition_range_vector({pr}) /* FIXME: pr is copied */, cmd, request, gt = tracing::global_trace_state_ptr(std::move(trace_state))] (database& db) mutable {
+ return _db.invoke_on(shard, [max_size, gs = global_schema_ptr(s), prv = dht::partition_range_vector({pr}) /* FIXME: pr is copied */, cmd, opts, gt = tracing::global_trace_state_ptr(std::move(trace_state))] (database& db) mutable {
tracing::trace(gt, "Start querying the token range that starts with {}", seastar::value_of([&prv] { return prv.begin()->start()->value().token(); }));
- return db.query(gs, *cmd, request, prv, gt, max_size).then([trace_state = gt.get()](auto&& f, cache_temperature ht) {
+ return db.query(gs, *cmd, opts, prv, gt, max_size).then([trace_state = gt.get()](auto&& f, cache_temperature ht) {
tracing::trace(trace_state, "Querying is done");
return make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>(make_foreign(std::move(f)), ht);
});
});
} else {
- return query_nonsingular_mutations_locally(s, cmd, {pr}, std::move(trace_state), max_size).then([s, cmd, request] (foreign_ptr<lw_shared_ptr<reconcilable_result>>&& r, cache_temperature&& ht) {
+ return query_nonsingular_mutations_locally(s, cmd, {pr}, std::move(trace_state), max_size).then([s, cmd, opts] (foreign_ptr<lw_shared_ptr<reconcilable_result>>&& r, cache_temperature&& ht) {
return make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>(
- ::make_foreign(::make_lw_shared(to_data_query_result(*r, s, cmd->slice, cmd->row_limit, cmd->partition_limit, request))), ht);
+ ::make_foreign(::make_lw_shared(to_data_query_result(*r, s, cmd->slice, cmd->row_limit, cmd->partition_limit, opts))), ht);
});
}
}
@@ -3752,16 +3753,10 @@ void storage_proxy::init_messaging_service() {
// this function assumes singular queries but doesn't validate
throw std::runtime_error("READ_DATA called with wrapping range");
}
- query::result_request qrr;
- switch (da) {
- case query::digest_algorithm::none:
- qrr = query::result_request::only_result;
- break;
- default:
- qrr = query::result_request::result_and_digest;
- break;
- }
- return p->query_result_local(std::move(s), cmd, std::move(pr2.first), qrr, trace_state_ptr, max_size);
+ query::result_options opts;
+ opts.digest_algo = da;
+ opts.request = da == query::digest_algorithm::none ? query::result_request::only_result : query::result_request::result_and_digest;
+ return p->query_result_local(std::move(s), cmd, std::move(pr2.first), opts, trace_state_ptr, max_size);
}).finally([&trace_state_ptr, src_ip] () mutable {
tracing::trace(trace_state_ptr, "read_data handling is done, sending a response to /{}", src_ip);
});
diff --git a/tests/database_test.cc b/tests/database_test.cc
index d1c0086267..abb3a8cb7d 100644
--- a/tests/database_test.cc
+++ b/tests/database_test.cc
@@ -54,21 +54,21 @@ SEASTAR_TEST_CASE(test_querying_with_limits) {
auto max_size = std::numeric_limits<size_t>::max();
{
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), 3);
- auto result = db.query(s, cmd, query::result_request::only_result, pranges, nullptr, max_size).get0();
+ auto result = db.query(s, cmd, query::result_options::only_result(), pranges, nullptr, max_size).get0();
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(3);
}

{
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(),
query::max_rows, gc_clock::now(), std::experimental::nullopt, 5);
- auto result = db.query(s, cmd, query::result_request::only_result, pranges, nullptr, max_size).get0();
+ auto result = db.query(s, cmd, query::result_options::only_result(), pranges, nullptr, max_size).get0();
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(5);
}

{
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(),
query::max_rows, gc_clock::now(), std::experimental::nullopt, 3);
- auto result = db.query(s, cmd, query::result_request::only_result, pranges, nullptr, max_size).get0();
+ auto result = db.query(s, cmd, query::result_options::only_result(), pranges, nullptr, max_size).get0();
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(3);
}
});
diff --git a/tests/memory_footprint.cc b/tests/memory_footprint.cc
index 4f8756ba66..c7d9009453 100644
--- a/tests/memory_footprint.cc
+++ b/tests/memory_footprint.cc
@@ -184,7 +184,7 @@ static sizes calculate_sizes(const mutation& m) {
result.cache = tracker.region().occupancy().used_space() - cache_initial_occupancy;
result.frozen = freeze(m).representation().size();
result.canonical = canonical_mutation(m).representation().size();
- result.query_result = m.query(partition_slice_builder(*s).build(), query::result_request::only_result).buf().size();
+ result.query_result = m.query(partition_slice_builder(*s).build(), query::result_options::only_result()).buf().size();

tmpdir sstable_dir;
auto sst = sstables::make_sstable(s,
diff --git a/tests/mutation_test.cc b/tests/mutation_test.cc
index bfde9f7592..365fda7c84 100644
--- a/tests/mutation_test.cc
+++ b/tests/mutation_test.cc
@@ -990,8 +990,8 @@ SEASTAR_TEST_CASE(test_query_digest) {
auto check_digests_equal = [] (const mutation& m1, const mutation& m2) {
auto ps1 = partition_slice_builder(*m1.schema()).build();
auto ps2 = partition_slice_builder(*m2.schema()).build();
- auto digest1 = *m1.query(ps1, query::result_request::only_digest).digest();
- auto digest2 = *m2.query(ps2, query::result_request::only_digest).digest();
+ auto digest1 = *m1.query(ps1, query::result_options::only_digest(query::digest_algorithm::MD5)).digest();
+ auto digest2 = *m2.query(ps2, query::result_options::only_digest(query::digest_algorithm::MD5)).digest();
if (digest1 != digest2) {
BOOST_FAIL(sprint("Digest should be the same for %s and %s", m1, m2));
}
@@ -1176,7 +1176,8 @@ SEASTAR_TEST_CASE(test_querying_expired_cells) {
.without_clustering_key_columns()
.without_partition_key_columns()
.build();
- return query::result_set::from_raw_result(s, slice, m.query(slice, query::result_request::result_and_digest, t));
+ auto opts = query::result_options{query::result_request::result_and_digest, query::digest_algorithm::MD5};
+ return query::result_set::from_raw_result(s, slice, m.query(slice, opts, t));
};

{
--
2.15.0

Duarte Nunes

<duarte@scylladb.com>
unread,
Nov 30, 2017, 6:00:52 PM11/30/17
to scylladb-dev@googlegroups.com
Introduce the digest_algorithm() function, which encapsulates the
decision of which digest algorithm to use. Right now it is set to MD5,
but future patches will change this.

Signed-off-by: Duarte Nunes <dua...@scylladb.com>
---
service/storage_proxy.cc | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc
index c01434d715..eace2da7b9 100644
--- a/service/storage_proxy.cc
+++ b/service/storage_proxy.cc
@@ -2401,6 +2401,10 @@ class data_read_resolver : public abstract_read_resolver {
}
};

+query::digest_algorithm digest_algorithm() {
+ return query::digest_algorithm::MD5;
+}
+
class abstract_read_executor : public enable_shared_from_this<abstract_read_executor> {
protected:
using targets_iterator = std::vector<gms::inet_address>::iterator;
@@ -2449,7 +2453,7 @@ class abstract_read_executor : public enable_shared_from_this<abstract_read_exec
future<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature> make_data_request(gms::inet_address ep, clock_type::time_point timeout, bool want_digest) {
++_proxy->_stats.data_read_attempts.get_ep_stat(ep);
auto opts = want_digest
- ? query::result_options{query::result_request::result_and_digest, query::digest_algorithm::MD5}
+ ? query::result_options{query::result_request::result_and_digest, digest_algorithm()}
: query::result_options{query::result_request::only_result, query::digest_algorithm::none};
if (is_me(ep)) {
tracing::trace(_trace_state, "read_data: querying locally");
@@ -2846,7 +2850,7 @@ ::shared_ptr<abstract_read_executor> storage_proxy::get_read_executor(lw_shared_

future<query::result_digest, api::timestamp_type, cache_temperature>
storage_proxy::query_result_local_digest(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr, tracing::trace_state_ptr trace_state, uint64_t max_size) {
- return query_result_local(std::move(s), std::move(cmd), pr, query::result_options::only_digest(query::digest_algorithm::MD5), std::move(trace_state), max_size).then([] (foreign_ptr<lw_shared_ptr<query::result>> result, cache_temperature hit_rate) {
+ return query_result_local(std::move(s), std::move(cmd), pr, query::result_options::only_digest(digest_algorithm()), std::move(trace_state), max_size).then([] (foreign_ptr<lw_shared_ptr<query::result>> result, cache_temperature hit_rate) {
return make_ready_future<query::result_digest, api::timestamp_type, cache_temperature>(*result->digest(), result->last_modified(), hit_rate);
});
}
--
2.15.0

Duarte Nunes

<duarte@scylladb.com>
unread,
Nov 30, 2017, 6:00:53 PM11/30/17
to scylladb-dev@googlegroups.com
While not strictly needed, specify which algorithm to use when request
a digest from a remote node. This is more flexible than relying on a
cluster wide feature, although that's what we'll do in subsequent
patches. It also makes the verb more consistent with the data request.

Signed-off-by: Duarte Nunes <dua...@scylladb.com>
---
message/messaging_service.hh | 4 ++--
service/storage_proxy.hh | 1 +
message/messaging_service.cc | 6 +++---
service/storage_proxy.cc | 17 +++++++++--------
4 files changed, 15 insertions(+), 13 deletions(-)

diff --git a/message/messaging_service.hh b/message/messaging_service.hh
index 0fa2c0c420..d3bab3b90f 100644
--- a/message/messaging_service.hh
+++ b/message/messaging_service.hh
@@ -333,9 +333,9 @@ class messaging_service : public seastar::async_sharded_service<messaging_servic
future<reconcilable_result, rpc::optional<cache_temperature>> send_read_mutation_data(msg_addr id, clock_type::time_point timeout, const query::read_command& cmd, const dht::partition_range& pr);

// Wrapper for READ_DIGEST
- void register_read_digest(std::function<future<query::result_digest, api::timestamp_type, cache_temperature> (const rpc::client_info&, query::read_command cmd, compat::wrapping_partition_range pr)>&& func);
+ void register_read_digest(std::function<future<query::result_digest, api::timestamp_type, cache_temperature> (const rpc::client_info&, query::read_command cmd, compat::wrapping_partition_range pr, rpc::optional<query::digest_algorithm> digest)>&& func);
void unregister_read_digest();
- future<query::result_digest, rpc::optional<api::timestamp_type>, rpc::optional<cache_temperature>> send_read_digest(msg_addr id, clock_type::time_point timeout, const query::read_command& cmd, const dht::partition_range& pr);
+ future<query::result_digest, rpc::optional<api::timestamp_type>, rpc::optional<cache_temperature>> send_read_digest(msg_addr id, clock_type::time_point timeout, const query::read_command& cmd, const dht::partition_range& pr, query::digest_algorithm da);

// Wrapper for TRUNCATE
void register_truncate(std::function<future<>(sstring, sstring)>&& func);
diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh
index ddc5f27926..360c98a77e 100644
--- a/service/storage_proxy.hh
+++ b/service/storage_proxy.hh
@@ -252,6 +252,7 @@ class storage_proxy : public seastar::async_sharded_service<storage_proxy> /*imp
tracing::trace_state_ptr trace_state,
uint64_t max_size = query::result_memory_limiter::maximum_result_size);
future<query::result_digest, api::timestamp_type, cache_temperature> query_result_local_digest(schema_ptr, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr, tracing::trace_state_ptr trace_state,
+ query::digest_algorithm da,
uint64_t max_size = query::result_memory_limiter::maximum_result_size);
future<foreign_ptr<lw_shared_ptr<query::result>>> query_partition_key_range(lw_shared_ptr<query::read_command> cmd, dht::partition_range_vector partition_ranges, db::consistency_level cl, tracing::trace_state_ptr trace_state);
dht::partition_range_vector get_restricted_ranges(const schema& s, dht::partition_range range);
diff --git a/message/messaging_service.cc b/message/messaging_service.cc
index 75be7b52aa..5be4374239 100644
--- a/message/messaging_service.cc
+++ b/message/messaging_service.cc
@@ -851,14 +851,14 @@ future<reconcilable_result, rpc::optional<cache_temperature>> messaging_service:
return send_message_timeout<future<reconcilable_result, rpc::optional<cache_temperature>>>(this, messaging_verb::READ_MUTATION_DATA, std::move(id), timeout, cmd, pr);
}

-void messaging_service::register_read_digest(std::function<future<query::result_digest, api::timestamp_type, cache_temperature> (const rpc::client_info&, query::read_command cmd, compat::wrapping_partition_range pr)>&& func) {
+void messaging_service::register_read_digest(std::function<future<query::result_digest, api::timestamp_type, cache_temperature> (const rpc::client_info&, query::read_command cmd, compat::wrapping_partition_range pr, rpc::optional<query::digest_algorithm> oda)>&& func) {
register_handler(this, netw::messaging_verb::READ_DIGEST, std::move(func));
}
void messaging_service::unregister_read_digest() {
_rpc->unregister_handler(netw::messaging_verb::READ_DIGEST);
}
-future<query::result_digest, rpc::optional<api::timestamp_type>, rpc::optional<cache_temperature>> messaging_service::send_read_digest(msg_addr id, clock_type::time_point timeout, const query::read_command& cmd, const dht::partition_range& pr) {
- return send_message_timeout<future<query::result_digest, rpc::optional<api::timestamp_type>, rpc::optional<cache_temperature>>>(this, netw::messaging_verb::READ_DIGEST, std::move(id), timeout, cmd, pr);
+future<query::result_digest, rpc::optional<api::timestamp_type>, rpc::optional<cache_temperature>> messaging_service::send_read_digest(msg_addr id, clock_type::time_point timeout, const query::read_command& cmd, const dht::partition_range& pr, query::digest_algorithm da) {
+ return send_message_timeout<future<query::result_digest, rpc::optional<api::timestamp_type>, rpc::optional<cache_temperature>>>(this, netw::messaging_verb::READ_DIGEST, std::move(id), timeout, cmd, pr, da);
}

// Wrapper for TRUNCATE
diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc
index eace2da7b9..125061d72f 100644
--- a/service/storage_proxy.cc
+++ b/service/storage_proxy.cc
@@ -2471,11 +2471,11 @@ class abstract_read_executor : public enable_shared_from_this<abstract_read_exec
++_proxy->_stats.digest_read_attempts.get_ep_stat(ep);
if (is_me(ep)) {
tracing::trace(_trace_state, "read_digest: querying locally");
- return _proxy->query_result_local_digest(_schema, _cmd, _partition_range, _trace_state);
+ return _proxy->query_result_local_digest(_schema, _cmd, _partition_range, _trace_state, digest_algorithm());
} else {
auto& ms = netw::get_local_messaging_service();
tracing::trace(_trace_state, "read_digest: sending a message to /{}", ep);
- return ms.send_read_digest(netw::messaging_service::msg_addr{ep, 0}, timeout, *_cmd, _partition_range).then([this, ep] (query::result_digest d, rpc::optional<api::timestamp_type> t,
+ return ms.send_read_digest(netw::messaging_service::msg_addr{ep, 0}, timeout, *_cmd, _partition_range, digest_algorithm()).then([this, ep] (query::result_digest d, rpc::optional<api::timestamp_type> t,
rpc::optional<cache_temperature> hit_rate) {
tracing::trace(_trace_state, "read_digest: got response from /{}", ep);
return make_ready_future<query::result_digest, api::timestamp_type, cache_temperature>(d, t ? t.value() : api::missing_timestamp, hit_rate.value_or(cache_temperature::invalid()));
@@ -2849,8 +2849,8 @@ ::shared_ptr<abstract_read_executor> storage_proxy::get_read_executor(lw_shared_
}

future<query::result_digest, api::timestamp_type, cache_temperature>
-storage_proxy::query_result_local_digest(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr, tracing::trace_state_ptr trace_state, uint64_t max_size) {
- return query_result_local(std::move(s), std::move(cmd), pr, query::result_options::only_digest(digest_algorithm()), std::move(trace_state), max_size).then([] (foreign_ptr<lw_shared_ptr<query::result>> result, cache_temperature hit_rate) {
+storage_proxy::query_result_local_digest(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr, tracing::trace_state_ptr trace_state, query::digest_algorithm da, uint64_t max_size) {
+ return query_result_local(std::move(s), std::move(cmd), pr, query::result_options::only_digest(da), std::move(trace_state), max_size).then([] (foreign_ptr<lw_shared_ptr<query::result>> result, cache_temperature hit_rate) {
return make_ready_future<query::result_digest, api::timestamp_type, cache_temperature>(*result->digest(), result->last_modified(), hit_rate);
});
}
@@ -3794,7 +3794,7 @@ void storage_proxy::init_messaging_service() {
});
});
});
- ms.register_read_digest([] (const rpc::client_info& cinfo, query::read_command cmd, compat::wrapping_partition_range pr) {
+ ms.register_read_digest([] (const rpc::client_info& cinfo, query::read_command cmd, compat::wrapping_partition_range pr, rpc::optional<query::digest_algorithm> oda) {
tracing::trace_state_ptr trace_state_ptr;
auto src_addr = netw::messaging_service::get_source(cinfo);
if (cmd.trace_info) {
@@ -3802,17 +3802,18 @@ void storage_proxy::init_messaging_service() {
tracing::begin(trace_state_ptr);
tracing::trace(trace_state_ptr, "read_digest: message received from /{}", src_addr.addr);
}
+ auto da = oda.value_or(query::digest_algorithm::MD5);
auto max_size = cinfo.retrieve_auxiliary<uint64_t>("max_result_size");
- return do_with(std::move(pr), get_local_shared_storage_proxy(), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared<query::read_command>(std::move(cmd)), src_addr = std::move(src_addr), max_size] (compat::wrapping_partition_range& pr, shared_ptr<storage_proxy>& p, tracing::trace_state_ptr& trace_state_ptr) mutable {
+ return do_with(std::move(pr), get_local_shared_storage_proxy(), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared<query::read_command>(std::move(cmd)), src_addr = std::move(src_addr), da, max_size] (compat::wrapping_partition_range& pr, shared_ptr<storage_proxy>& p, tracing::trace_state_ptr& trace_state_ptr) mutable {
p->_stats.replica_digest_reads++;
auto src_ip = src_addr.addr;
- return get_schema_for_read(cmd->schema_version, std::move(src_addr)).then([cmd, &pr, &p, &trace_state_ptr, max_size] (schema_ptr s) {
+ return get_schema_for_read(cmd->schema_version, std::move(src_addr)).then([cmd, &pr, &p, &trace_state_ptr, da, max_size] (schema_ptr s) {
auto pr2 = compat::unwrap(std::move(pr), *s);
if (pr2.second) {
// this function assumes singular queries but doesn't validate
throw std::runtime_error("READ_DIGEST called with wrapping range");
}
- return p->query_result_local_digest(std::move(s), cmd, std::move(pr2.first), trace_state_ptr, max_size);
+ return p->query_result_local_digest(std::move(s), cmd, std::move(pr2.first), trace_state_ptr, da, max_size);
}).finally([&trace_state_ptr, src_ip] () mutable {
tracing::trace(trace_state_ptr, "read_digest handling is done, sending a response to /{}", src_ip);
});
--
2.15.0

Duarte Nunes

<duarte@scylladb.com>
unread,
Nov 30, 2017, 6:00:55 PM11/30/17
to scylladb-dev@googlegroups.com
We add a cluster feature that informs whether the xxHash algorithm is
supported, and allow nodes to switch to it when possible. We use a
cluster feature because older versions are not ready to receive a
different digest algorithm than MD5 when answering a data request.

Fixes #2884

Signed-off-by: Duarte Nunes <dua...@scylladb.com>
---
digest_algorithm.hh | 4 ++--
idl/query.idl.hh | 3 ++-
service/storage_service.hh | 6 ++++++
service/storage_proxy.cc | 4 +++-
service/storage_service.cc | 3 +++
5 files changed, 16 insertions(+), 4 deletions(-)

diff --git a/digest_algorithm.hh b/digest_algorithm.hh
index 9a7a35cd7d..d8a6bfb45f 100644
--- a/digest_algorithm.hh
+++ b/digest_algorithm.hh
@@ -27,8 +27,8 @@ constexpr size_t digest_size = 16;

enum class digest_algorithm : uint8_t {
none = 0, // digest not required
- MD5 = 1, // default algorithm
- xxHash = 2,
+ MD5 = 1,
+ xxHash = 2,// default algorithm
};

}
diff --git a/idl/query.idl.hh b/idl/query.idl.hh
index b2aa14dbc8..6aca9a59fa 100644
--- a/idl/query.idl.hh
+++ b/idl/query.idl.hh
@@ -31,7 +31,8 @@ class query_result stub [[writable]] {

enum class digest_algorithm : uint8_t {
none = 0, // digest not required
- MD5 = 1, // default algorithm
+ MD5 = 1,
+ xxHash = 2,// default algorithm
};

}
diff --git a/service/storage_service.hh b/service/storage_service.hh
index 4ddfa485cc..44b062518e 100644
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -270,6 +270,7 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
gms::feature _correct_counter_order_feature;
gms::feature _schema_tables_v3;
gms::feature _correct_non_compound_range_tombstones;
+ gms::feature _xxhash_feature;
public:
void enable_all_features() {
_range_tombstones_feature.enable();
@@ -281,6 +282,7 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
_correct_counter_order_feature.enable();
_schema_tables_v3.enable();
_correct_non_compound_range_tombstones.enable();
+ _xxhash_feature.enable();
}

void finish_bootstrapping() {
@@ -2249,6 +2251,10 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
bool cluster_supports_reading_correctly_serialized_range_tombstones() const {
return bool(_correct_non_compound_range_tombstones);
}
+
+ bool cluster_supports_xxhash_digest_algorithm() const {
+ return bool(_xxhash_feature);
+ }
};

inline future<> init_storage_service(distributed<database>& db, sharded<auth::service>& auth_service) {
diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc
index 125061d72f..ebb7ac4d4d 100644
--- a/service/storage_proxy.cc
+++ b/service/storage_proxy.cc
@@ -2402,7 +2402,9 @@ class data_read_resolver : public abstract_read_resolver {
};

query::digest_algorithm digest_algorithm() {
- return query::digest_algorithm::MD5;
+ return service::get_local_storage_service().cluster_supports_xxhash_digest_algorithm()
+ ? query::digest_algorithm::xxHash
+ : query::digest_algorithm::MD5;
}

class abstract_read_executor : public enable_shared_from_this<abstract_read_executor> {
diff --git a/service/storage_service.cc b/service/storage_service.cc
index 4116809530..e0f5e2fe56 100644
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -90,6 +90,7 @@ static const sstring DIGEST_MULTIPARTITION_READ_FEATURE = "DIGEST_MULTIPARTITION
static const sstring CORRECT_COUNTER_ORDER_FEATURE = "CORRECT_COUNTER_ORDER";
static const sstring SCHEMA_TABLES_V3 = "SCHEMA_TABLES_V3";
static const sstring CORRECT_NON_COMPOUND_RANGE_TOMBSTONES = "CORRECT_NON_COMPOUND_RANGE_TOMBSTONES";
+static const sstring XXHASH_FEATURE = "XXHASH";

distributed<storage_service> _the_storage_service;

@@ -136,6 +137,7 @@ sstring storage_service::get_config_supported_features() {
CORRECT_COUNTER_ORDER_FEATURE,
SCHEMA_TABLES_V3,
CORRECT_NON_COMPOUND_RANGE_TOMBSTONES,
+ XXHASH_FEATURE,
};
if (service::get_local_storage_service()._db.local().get_config().experimental()) {
features.push_back(MATERIALIZED_VIEWS_FEATURE);
@@ -347,6 +349,7 @@ void storage_service::register_features() {
_correct_counter_order_feature = gms::feature(CORRECT_COUNTER_ORDER_FEATURE);
_schema_tables_v3 = gms::feature(SCHEMA_TABLES_V3);
_correct_non_compound_range_tombstones = gms::feature(CORRECT_NON_COMPOUND_RANGE_TOMBSTONES);
+ _xxhash_feature = gms::feature(XXHASH_FEATURE);

if (_db.local().get_config().experimental()) {
_materialized_views_feature = gms::feature(MATERIALIZED_VIEWS_FEATURE);
--
2.15.0

Duarte Nunes

<duarte@scylladb.com>
unread,
Nov 30, 2017, 6:00:55 PM11/30/17
to scylladb-dev@googlegroups.com
Signed-off-by: Duarte Nunes <dua...@scylladb.com>
---
schema.cc | 1 -
1 file changed, 1 deletion(-)

diff --git a/schema.cc b/schema.cc
index c4de94fe04..40cf4afccb 100644
--- a/schema.cc
+++ b/schema.cc
@@ -24,7 +24,6 @@
#include "cql3/util.hh"
#include "schema.hh"
#include "schema_builder.hh"
-#include "md5_hasher.hh"
#include <boost/algorithm/cxx11/any_of.hpp>
#include <boost/range/adaptor/transformed.hpp>
#include "db/marshal/type_parser.hh"
--
2.15.0

Duarte Nunes

<duarte@scylladb.com>
unread,
Nov 30, 2017, 6:00:56 PM11/30/17
to scylladb-dev@googlegroups.com
Signed-off-by: Duarte Nunes <dua...@scylladb.com>
---
tests/mutation_test.cc | 37 +++++++++++++++++++++----------------
1 file changed, 21 insertions(+), 16 deletions(-)

diff --git a/tests/mutation_test.cc b/tests/mutation_test.cc
index 365fda7c84..35e836757e 100644
--- a/tests/mutation_test.cc
+++ b/tests/mutation_test.cc
@@ -26,6 +26,7 @@
#include <boost/range/algorithm_ext/push_back.hpp>
#include "mutation_query.hh"
#include "md5_hasher.hh"
+#include "xx_hasher.hh"

#include "core/sstring.hh"
#include "core/do_with.hh"
@@ -958,23 +959,27 @@ SEASTAR_TEST_CASE(test_mutation_equality) {
SEASTAR_TEST_CASE(test_mutation_hash) {
return seastar::async([] {
for_each_mutation_pair([] (auto&& m1, auto&& m2, are_equal eq) {
- auto get_hash = [] (const mutation& m) {
- md5_hasher h;
- feed_hash(h, m);
- return h.finalize();
- };
- auto h1 = get_hash(m1);
- auto h2 = get_hash(m2);
- if (eq) {
- if (h1 != h2) {
- BOOST_FAIL(sprint("Hash should be equal for %s and %s", m1, m2));
- }
- } else {
- // We're using a strong hasher, collision should be unlikely
- if (h1 == h2) {
- BOOST_FAIL(sprint("Hash should be different for %s and %s", m1, m2));
+ auto test_with_hasher = [&] (auto hasher) {
+ auto get_hash = [&] (const mutation &m) {
+ auto h = hasher;
+ feed_hash(h, m);
+ return h.finalize();
+ };
+ auto h1 = get_hash(m1);
+ auto h2 = get_hash(m2);
+ if (eq) {
+ if (h1 != h2) {
+ BOOST_FAIL(sprint("Hash should be equal for %s and %s", m1, m2));
+ }
+ } else {
+ // We're using a strong hasher, collision should be unlikely
+ if (h1 == h2) {
+ BOOST_FAIL(sprint("Hash should be different for %s and %s", m1, m2));
+ }
}
- }
+ };
+ test_with_hasher(md5_hasher());
+ test_with_hasher(xx_hasher());
});
});
}
--
2.15.0

Duarte Nunes

<duarte@scylladb.com>
unread,
Nov 30, 2017, 6:00:57 PM11/30/17
to scylladb-dev@googlegroups.com
Signed-off-by: Duarte Nunes <dua...@scylladb.com>
---
tests/mutation_test.cc | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/tests/mutation_test.cc b/tests/mutation_test.cc
index 35e836757e..55c5f32a64 100644
--- a/tests/mutation_test.cc
+++ b/tests/mutation_test.cc
@@ -995,8 +995,8 @@ SEASTAR_TEST_CASE(test_query_digest) {
auto check_digests_equal = [] (const mutation& m1, const mutation& m2) {
auto ps1 = partition_slice_builder(*m1.schema()).build();
auto ps2 = partition_slice_builder(*m2.schema()).build();
- auto digest1 = *m1.query(ps1, query::result_options::only_digest(query::digest_algorithm::MD5)).digest();
- auto digest2 = *m2.query(ps2, query::result_options::only_digest(query::digest_algorithm::MD5)).digest();
+ auto digest1 = *m1.query(ps1, query::result_options::only_digest(query::digest_algorithm::xxHash)).digest();
+ auto digest2 = *m2.query(ps2, query::result_options::only_digest(query::digest_algorithm::xxHash)).digest();
if (digest1 != digest2) {
BOOST_FAIL(sprint("Digest should be the same for %s and %s", m1, m2));
}
@@ -1181,7 +1181,7 @@ SEASTAR_TEST_CASE(test_querying_expired_cells) {
.without_clustering_key_columns()
.without_partition_key_columns()
.build();
- auto opts = query::result_options{query::result_request::result_and_digest, query::digest_algorithm::MD5};
+ auto opts = query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash};

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Nov 30, 2017, 6:25:35 PM11/30/17
to Duarte Nunes, scylladb-dev
I think you possibly pasted the wrong results for xxHash?! seems the same as MD5.
 

Using a normal cassandra-stress run, one can already see a (small)
improvement. With MD5:

Results:
op rate                   : 16007 [READ:16007]
latency mean              : 6.2 [READ:6.2]
latency median            : 5.3 [READ:5.3]
latency 95th percentile   : 11.6 [READ:11.6]
latency 99th percentile   : 13.7 [READ:13.7]
latency 99.9th percentile : 16.0 [READ:16.0]
latency max               : 27.5 [READ:27.5]
Total partitions          : 10000000 [READ:10000000]
Total operation time      : 00:10:24

With xxHash:

Results:
op rate                   : 17643 [READ:17643]
latency mean              : 5.7 [READ:5.7]
latency median            : 5.0 [READ:5.0]
latency 95th percentile   : 10.0 [READ:10.0]
latency 99th percentile   : 12.0 [READ:12.0]
latency 99.9th percentile : 14.1 [READ:14.1]
latency max               : 26.5 [READ:26.5]
Total partitions          : 10000000 [READ:10000000]
Total operation time      : 00:09:26

cool!
 

Fixes #2884

Also in:
    Also in:
    g...@github.com:duarten/scylla.git  xxhash/v1
    https://github.com/duarten/scylla/tree/xxhash/v1

Sad note: I wish I knew how to configure cassandra-stress to use larger
values.

IIRC, cassandra-stress yaml file allows you to specify size distribution for a given column
 

--
2.15.0

--
You received this message because you are subscribed to the Google Groups "ScyllaDB development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scylladb-dev+unsubscribe@googlegroups.com.
To post to this group, send email to scylla...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/scylladb-dev/20171130230038.31318-1-duarte%40scylladb.com.
For more options, visit https://groups.google.com/d/optout.

Duarte Nunes

<duarte@scylladb.com>
unread,
Nov 30, 2017, 6:33:28 PM11/30/17
to Raphael S. Carvalho, scylladb-dev
Ooops.
I wish there was a youtube video explaining just how to do that!

Duarte Nunes

<duarte@scylladb.com>
unread,
Nov 30, 2017, 6:34:22 PM11/30/17
to scylladb-dev@googlegroups.com
This should be replaced with:

size = 256; elapsed = 2us
size = 512; elapsed = 1us
size = 1024; elapsed = 1us
size = 2048; elapsed = 2us
size = 4096; elapsed = 2us
size = 8192; elapsed = 3us
size = 16384; elapsed = 5us
size = 32768; elapsed = 8us
size = 65536; elapsed = 14us
size = 131072; elapsed = 28us
size = 262144; elapsed = 59us
size = 524288; elapsed = 116us
size = 1048576; elapsed = 226us
size = 2097152; elapsed = 456us
size = 4194304; elapsed = 935us
size = 8388608; elapsed = 1848us
size = 16777216; elapsed = 4723us
size = 33554432; elapsed = 10507us
size = 67108864; elapsed = 21622us

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Nov 30, 2017, 6:57:58 PM11/30/17
to Duarte Nunes, scylladb-dev
On Thu, Nov 30, 2017 at 9:00 PM, Duarte Nunes <dua...@scylladb.com> wrote:
from xxHash/blob/dev/xxhash.h:
"The function returns an error code, with 0 meaning OK, and any other value meaning there is an error."

not sure what kind of error there could be, but I think it may be worth handling it.

 
+    }
+
+    uint64_t finalize() {
+        return XXH64_digest(_state.get());
+    }
+};
--
2.15.0

--
You received this message because you are subscribed to the Google Groups "ScyllaDB development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scylladb-dev+unsubscribe@googlegroups.com.
To post to this group, send email to scylla...@googlegroups.com.

Duarte Nunes

<duarte@scylladb.com>
unread,
Nov 30, 2017, 7:17:25 PM11/30/17
to Raphael S. Carvalho, scylladb-dev
It only returns an error if the input is null (otherwise it just does math), which we must ensure it's not.

Paweł Dziepak

<pdziepak@scylladb.com>
unread,
Dec 1, 2017, 5:43:12 AM12/1/17
to Duarte Nunes, scylladb-dev
Is there source code for this test?

Any particular reason why xxHash64 was chosen and not something else (CityHash looks quite good as well, FarmHash even better if SSE4.2 is available which we do require anyway)?
 
--
You received this message because you are subscribed to the Google Groups "ScyllaDB development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scylladb-dev+unsubscribe@googlegroups.com.
To post to this group, send email to scylla...@googlegroups.com.

Duarte Nunes

<duarte@scylladb.com>
unread,
Dec 1, 2017, 6:00:17 AM12/1/17
to Paweł Dziepak, scylladb-dev
Yes, see the issue.



Any particular reason why xxHash64 was chosen and not something else (CityHash looks quite good as well, FarmHash even better if SSE4.2 is available which we do require anyway)?

Just that it's fast. Are there benchmarks and Q.Score for those?

Paweł Dziepak

<pdziepak@scylladb.com>
unread,
Dec 1, 2017, 12:11:16 PM12/1/17
to Duarte Nunes, scylladb-dev
Well, I haven't done any proper research but there is this [1] and this [2] for starters (though, I fail to interpret the table in the second link in any meaningful way).

BTW If we are changing hashing algorithm perhaps we should also precompute hashes of large values (or at least modify the way we hash them so that it is possible to do so later without any compatibility problems).

Duarte Nunes

<duarte@scylladb.com>
unread,
Dec 1, 2017, 12:49:53 PM12/1/17
to Paweł Dziepak, scylladb-dev
In the issue I proposed we store lazily calculated values of large cells in the cache, as metadata.
[1] says "xxHash64 wins at larger (0.5KB+) data sizes, followed closely by 64 bit FarmHash and CityHash"

[2] says "So the fastest hash functions on x86_64 without quality problems are:

falkhash (macho64 and elf64 nasm only, with HW AES extension)
t1ha + mum (machine specific, mum: different arch results)
FarmHash (not portable, too machine specific: 64 vs 32bit, old gcc, ...)
Metro (but not 64crc yet, WIP)
Spooky32
xxHash64
fasthash
City (deprecated)"

Duarte Nunes

<duarte@scylladb.com>
unread,
Dec 4, 2017, 6:20:08 AM12/4/17
to scylladb-dev@googlegroups.com
On 30/11/2017 23:00, Duarte Nunes wrote:
pings

Avi Kivity

<avi@scylladb.com>
unread,
Dec 4, 2017, 6:58:50 AM12/4/17
to Duarte Nunes, scylladb-dev@googlegroups.com
Please adjust licenses/ too.

Why xxhash and not others?

Avi Kivity

<avi@scylladb.com>
unread,
Dec 4, 2017, 6:59:55 AM12/4/17
to Duarte Nunes, scylladb-dev@googlegroups.com


On 12/01/2017 01:00 AM, Duarte Nunes wrote:
> Signed-off-by: Duarte Nunes <dua...@scylladb.com>
> ---
> .gitmodules | 3 +++
> xxHash | 1 +
> 2 files changed, 4 insertions(+)
> create mode 160000 xxHash
>
> diff --git a/.gitmodules b/.gitmodules
> index 18e14ee560..6af0a36401 100644
> --- a/.gitmodules
> +++ b/.gitmodules
> @@ -9,3 +9,6 @@
> [submodule "dist/ami/files/scylla-ami"]
> path = dist/ami/files/scylla-ami
> url = ../scylla-ami
> +[submodule "xxHash"]
> + path = xxHash
> + url = g...@github.com:Cyan4973/xxHash.git

Oh, and our M.O. is to clone the repo and refer to the clone. This way,
if the repo you reference dies, we still have our clone.

Duarte Nunes

<duarte@scylladb.com>
unread,
Dec 4, 2017, 7:06:09 AM12/4/17
to Avi Kivity, scylladb-dev@googlegroups.com
Will fix.




Why xxhash and not others?

I thought you have done exhaustive research!

I can add the rationale to the commit message. According to https://aras-p.info/blog/2016/08/09/More-Hash-Function-Tests/
and https://github.com/rurban/smhasher, it's a good choice.

Duarte Nunes

<duarte@scylladb.com>
unread,
Dec 4, 2017, 7:06:42 AM12/4/17
to Avi Kivity, scylladb-dev@googlegroups.com
On 04/12/2017 11:59, Avi Kivity wrote:
>
>
> On 12/01/2017 01:00 AM, Duarte Nunes wrote:
>> Signed-off-by: Duarte Nunes <dua...@scylladb.com>
>> ---
>>   .gitmodules | 3 +++
>>   xxHash      | 1 +
>>   2 files changed, 4 insertions(+)
>>   create mode 160000 xxHash
>>
>> diff --git a/.gitmodules b/.gitmodules
>> index 18e14ee560..6af0a36401 100644
>> --- a/.gitmodules
>> +++ b/.gitmodules
>> @@ -9,3 +9,6 @@
>>   [submodule "dist/ami/files/scylla-ami"]
>>       path = dist/ami/files/scylla-ami
>>       url = ../scylla-ami
>> +[submodule "xxHash"]
>> +    path = xxHash
>> +    url = g...@github.com:Cyan4973/xxHash.git
>
> Oh, and our M.O. is to clone the repo and refer to the clone. This way, if the repo you reference dies, we still have
> our clone.

Oh! Can I clone repos to our org?

Paweł Dziepak

<pdziepak@scylladb.com>
unread,
Dec 4, 2017, 7:10:42 AM12/4/17
to Duarte Nunes, scylladb-dev
My point was that such change will probably require an incompatible change in the way we compute the hash, because the final digest won't be computed by just pushing the value to the hasher but a pre-computed hash of the value (in other words, the final digest would be a hash of value hashes). Since this series is already introducing new hashing algorithm it may be worthwhile to do this fully as intended. Otherwise, soon we will have to introduce yet another way of computing the hash.

[1] says "xxHash64 wins at larger (0.5KB+) data sizes, followed closely by 64 bit FarmHash and CityHash"

This is exactly what doesn't matter for us. For large blobs we will use precomputed hash so while we need good speed for that as well it is better to have the fastest algorithm for small values and "just" fast one for the big blobs. Another option would be to use different algorithms depending on the value size, but that will require exposing that threshold in the cluster, probably not a big problem though.
 

[2] says "So the fastest hash functions on x86_64 without quality problems are:

falkhash (macho64 and elf64 nasm only, with HW AES extension)
t1ha + mum (machine specific, mum: different arch results)
FarmHash (not portable, too machine specific: 64 vs 32bit, old gcc, ...)
Metro (but not 64crc yet, WIP)
Spooky32
xxHash64
fasthash
City (deprecated)"

I don't have much confidence in any of these results because the values of MB/s and cyclces/hash doesn't seem to make any sense (or a proper explaination is missing, either way, I'm reluctant to trust such data). CityHash is "deprecated" in a way that FarmHash is its successor. FarmHash seems to work on other platforms as well though it is not as fast (no idea how big the difference is). Metro looks quite promising as well.
Anyway, these links were probably the places were we should start research, not end it.

Paweł Dziepak

<pdziepak@scylladb.com>
unread,
Dec 4, 2017, 7:12:18 AM12/4/17
to Duarte Nunes, scylladb-dev
Why not boost::variant?
 
--
2.15.0

--
You received this message because you are subscribed to the Google Groups "ScyllaDB development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scylladb-dev+unsubscribe@googlegroups.com.
To post to this group, send email to scylla...@googlegroups.com.

Avi Kivity

<avi@scylladb.com>
unread,
Dec 4, 2017, 7:15:16 AM12/4/17
to Duarte Nunes, scylladb-dev@googlegroups.com


On 12/04/2017 02:06 PM, Duarte Nunes wrote:
> On 04/12/2017 11:59, Avi Kivity wrote:
>>
>>
>> On 12/01/2017 01:00 AM, Duarte Nunes wrote:
>>> Signed-off-by: Duarte Nunes <dua...@scylladb.com>
>>> ---
>>>   .gitmodules | 3 +++
>>>   xxHash      | 1 +
>>>   2 files changed, 4 insertions(+)
>>>   create mode 160000 xxHash
>>>
>>> diff --git a/.gitmodules b/.gitmodules
>>> index 18e14ee560..6af0a36401 100644
>>> --- a/.gitmodules
>>> +++ b/.gitmodules
>>> @@ -9,3 +9,6 @@
>>>   [submodule "dist/ami/files/scylla-ami"]
>>>       path = dist/ami/files/scylla-ami
>>>       url = ../scylla-ami
>>> +[submodule "xxHash"]
>>> +    path = xxHash
>>> +    url = g...@github.com:Cyan4973/xxHash.git
>>
>> Oh, and our M.O. is to clone the repo and refer to the clone. This
>> way, if the repo you reference dies, we still have our clone.
>
> Oh! Can I clone repos to our org?

Don't know, but let's agree on the hash first.

Avi Kivity

<avi@scylladb.com>
unread,
Dec 4, 2017, 7:19:14 AM12/4/17
to Duarte Nunes, scylladb-dev@googlegroups.com


On 12/01/2017 01:00 AM, Duarte Nunes wrote:
> This patch introduces xx_hasher, a class conforming to the Hasher
> concept, which will be used to calculate the data digest in subsequent
> patches. It is expected to be an order of magnitude faster than md5.
>
> We use the 64 bit variant of the algorithm, the 128 bit one still
> being under development.
>
> Signed-off-by: Duarte Nunes <dua...@scylladb.com>
> ---
> xx_hasher.hh | 63 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
> 1 file changed, 63 insertions(+)
> create mode 100644 xx_hasher.hh
>
> diff --git a/xx_hasher.hh b/xx_hasher.hh
> new file mode 100644
> index 0000000000..0080e796a1
> --- /dev/null
> +++ b/xx_hasher.hh
> @@ -0,0 +1,63 @@
> +/*
> + * Copyright (C) 2017 ScyllaDB
> + */
> +
> +/*
> + * This file is part of Scylla.
> + *
> + * Scylla is free software: you can redistribute it and/or modify
> + * it under the terms of the GNU Affero General Public License as published by
> + * the Free Software Foundation, either version 3 of the License, or
> + * (at your option) any later version.
> + *
> + * Scylla is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + * GNU General Public License for more details.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with Scylla. If not, see <http://www.gnu.org/licenses/>.
> + */
> +
> +#pragma once
> +
> +#include "bytes.hh"
> +#include <xxHash/xxhash.h>
> +#include <memory>
> +
> +class xx_hasher {
> + struct xxhash_state_deleter {
> + void operator()(XXH64_state_t* x) const { XXH64_freeState(x); }
> + };

How much state does this thing need? can't we allocate it as a member?

> +
> + std::unique_ptr<XXH64_state_t, xxhash_state_deleter> _state;
> +public:
> + xx_hasher() : _state(std::unique_ptr<XXH64_state_t, xxhash_state_deleter>(XXH64_createState())) {
> + XXH64_reset(_state.get(), 0);
> + }
> +
> + xx_hasher(xx_hasher&&) noexcept = default;
> +
> + xx_hasher& operator=(xx_hasher&&) noexcept = default;
> +
> + xx_hasher(const xx_hasher& other) : xx_hasher() {
> + XXH64_copyState(_state.get(), other._state.get());
> + }
> +
> + xx_hasher& operator=(const xx_hasher& other) {
> + if (this != &other) {
> + auto tmp = other;
> + this->~xx_hasher();
> + new (this) xx_hasher(std::move(tmp));
> + }

Pawel ruined this style of assignop for me (even though this one
probably doesn't invoke undefined behavior). Let's find an alternative
idiomatic style. But for now this can stay.

> + return *this;
> + }
> +
> + void update(const char* ptr, size_t length) {
> + XXH64_update(_state.get(), ptr, length);

Avi Kivity

<avi@scylladb.com>
unread,
Dec 4, 2017, 7:20:46 AM12/4/17
to Duarte Nunes, scylladb-dev@googlegroups.com


On 12/01/2017 01:00 AM, Duarte Nunes wrote:
> Signed-off-by: Duarte Nunes <dua...@scylladb.com>
> ---
> md5_hasher.hh | 8 +++++---
> 1 file changed, 5 insertions(+), 3 deletions(-)
>
> diff --git a/md5_hasher.hh b/md5_hasher.hh
> index 0de64e5b33..379aff14ac 100644
> --- a/md5_hasher.hh
> +++ b/md5_hasher.hh
> @@ -30,19 +30,21 @@
> class md5_hasher {
> CryptoPP::Weak::MD5 hash{};
> public:
> + static constexpr size_t size = CryptoPP::Weak::MD5::DIGESTSIZE;
> +
> void update(const char* ptr, size_t length) {
> static_assert(sizeof(char) == sizeof(byte), "Assuming lengths will be the same");
> hash.Update(reinterpret_cast<const byte*>(ptr), length * sizeof(byte));
> }
>
> bytes finalize() {
> - bytes digest{bytes::initialized_later(), CryptoPP::Weak::MD5::DIGESTSIZE};
> + bytes digest{bytes::initialized_later(), size};
> hash.Final(reinterpret_cast<unsigned char*>(digest.begin()));
> return digest;
> }
>
> - std::array<uint8_t, CryptoPP::Weak::MD5::DIGESTSIZE> finalize_array() {
> - std::array<uint8_t, CryptoPP::Weak::MD5::DIGESTSIZE> array;
> + std::array<uint8_t, size> finalize_array() {
> + std::array<uint8_t, size> array;
> hash.Final(reinterpret_cast<unsigned char*>(array.data()));
> return array;

Maybe we should standardize on returning std::array<> for hashes? The
other one returns uint64_t.

> }

Avi Kivity

<avi@scylladb.com>
unread,
Dec 4, 2017, 7:23:00 AM12/4/17
to Duarte Nunes, scylladb-dev@googlegroups.com


On 12/01/2017 01:00 AM, Duarte Nunes wrote:
> This patch paves the way for us to encapsulate the actual digest
> algorithm used for a query. The digester class dispatches to a
> concrete implementation based on the digest algorithm being used. It
> wraps the xxHash algorithm to provide a 128 bit hash, which is the
> size of digest expected by the inter-node protocol.
>
> Signed-off-by: Duarte Nunes <dua...@scylladb.com>
> ---
> digest_algorithm.hh | 2 +
> digester.hh | 173 ++++++++++++++++++++++++++++++++++++++++++++++++++++
> 2 files changed, 175 insertions(+)
> create mode 100644 digester.hh
>
> diff --git a/digest_algorithm.hh b/digest_algorithm.hh
> index 26e31b0c11..9a7a35cd7d 100644
> --- a/digest_algorithm.hh
> +++ b/digest_algorithm.hh
> @@ -23,6 +23,8 @@
>
> namespace query {
>
> +constexpr size_t digest_size = 16;
> +
> enum class digest_algorithm : uint8_t {
> none = 0, // digest not required
> MD5 = 1, // default algorithm
> diff --git a/digester.hh b/digester.hh
> new file mode 100644
> index 0000000000..651ece024f
> --- /dev/null
> +++ b/digester.hh
> @@ -0,0 +1,173 @@
> +/*
> + * Copyright (C) 2017 ScyllaDB
> + */
> +
> +/*
> + * This file is part of Scylla.
> + *
> + * Scylla is free software: you can redistribute it and/or modify
> + * it under the terms of the GNU Affero General Public License as published by
> + * the Free Software Foundation, either version 3 of the License, or
> + * (at your option) any later version.
> + *
> + * Scylla is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + * GNU General Public License for more details.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with Scylla. If not, see <http://www.gnu.org/licenses/>.
> + */
> +
> +#pragma once
> +
> +#include "digest_algorithm.hh"
> +#include "md5_hasher.hh"
> +#include "utils/serialization.hh"
> +#include "xx_hasher.hh"
> +
> +namespace query {
> +
> +class digester final {
> + static_assert(md5_hasher::size == digest_size, "MD5 hash size needs to match the digest size");
> +
> + struct xx_hasher_wrapper : public xx_hasher {
> + bytes finalize() {
> + bytes digest{bytes::initialized_later(), digest_size};
> + serialize_to(digest.begin());
> + return digest;
> + }
> +
> + std::array<uint8_t, digest_size> finalize_array() {
> + std::array<uint8_t, digest_size> digest;
> + serialize_to(digest.begin());
> + return digest;
> + }
> +
> + template<typename OutIterator>
> + void serialize_to(OutIterator&& out) {
> + serialize_int64(out, 0);
> + serialize_int64(out, xx_hasher::finalize());
> + }
> + };
> +
> + union concrete_hasher {
> + md5_hasher md5;
> + xx_hasher_wrapper xx;
> +
> + concrete_hasher() { }
> + ~concrete_hasher() { }
> + } _impl;
> + digest_algorithm _algo;
> + if (this != &other) {
> + this->~digester();
> + new (this) digester(std::move(other));
> + }
> + return *this;
> + }
> +
> + digester& operator=(const digester& other) noexcept {
> + if (this != &other) {
> + auto tmp = other;
> + this->~digester();
> + new (this) digester(std::move(tmp));
> + }
> + return *this;
> + }
> +
> + void update(const char* ptr, size_t length) {
> + switch (_algo) {
> + case digest_algorithm::MD5:
> + _impl.md5.update(ptr, length);
> + break;
> + case digest_algorithm::xxHash:
> + _impl.xx.update(ptr, length);
> + break;
> + case digest_algorithm ::none:
> + break;

Will we go through this switch on every piece of data? Or is it done
once per fragment/whatever?

Duarte Nunes

<duarte@scylladb.com>
unread,
Dec 4, 2017, 7:24:15 AM12/4/17
to Paweł Dziepak, scylladb-dev
I don't follow. What values do we pre-compute a hash for? How do we pick them? Is ther an issue for this?



[1] says "xxHash64 wins at larger (0.5KB+) data sizes, followed closely by 64 bit FarmHash and CityHash"

This is exactly what doesn't matter for us. For large blobs we will use precomputed hash so while we need good speed for that as well it is better to have the fastest algorithm for small values and "just" fast one for the big blobs. Another option would be to use different algorithms depending on the value size, but that will require exposing that threshold in the cluster, probably not a big problem though.

The latency hits this aims to solve are due to big values. For low values, latency will be dominated by something else. I don't think it's a good idea to be adding cluster features to define hash thresholds. 

 

[2] says "So the fastest hash functions on x86_64 without quality problems are:

falkhash (macho64 and elf64 nasm only, with HW AES extension)
t1ha + mum (machine specific, mum: different arch results)
FarmHash (not portable, too machine specific: 64 vs 32bit, old gcc, ...)
Metro (but not 64crc yet, WIP)
Spooky32
xxHash64
fasthash
City (deprecated)"

I don't have much confidence in any of these results because the values of MB/s and cyclces/hash doesn't seem to make any sense (or a proper explaination is missing, either way, I'm reluctant to trust such data). CityHash is "deprecated" in a way that FarmHash is its successor. FarmHash seems to work on other platforms as well though it is not as fast (no idea how big the difference is). Metro looks quite promising as well.
Anyway, these links were probably the places were we should start research, not end it.

Metro hash seems stuck in 2015. When I google, xxHash seemed to have good adoption (supported in projects like OpenHFT). There's SeaHash too, which gets bonus points for being in rust.

Anyway, I don't really have the cycles to embark on a hash function research project.

Duarte Nunes

<duarte@scylladb.com>
unread,
Dec 4, 2017, 7:25:14 AM12/4/17
to Paweł Dziepak, scylladb-dev
Would I have to write a bunch of those ugly visitors?

Avi Kivity

<avi@scylladb.com>
unread,
Dec 4, 2017, 7:27:05 AM12/4/17
to Duarte Nunes, scylladb-dev@googlegroups.com


On 12/01/2017 01:00 AM, Duarte Nunes wrote:
> Using a normal cassandra-stress run, one can already see a (small)
> improvement. With MD5:
>
> Results:
> op rate : 16007 [READ:16007]
> latency mean : 6.2 [READ:6.2]
> latency median : 5.3 [READ:5.3]
> latency 95th percentile : 11.6 [READ:11.6]
> latency 99th percentile : 13.7 [READ:13.7]
> latency 99.9th percentile : 16.0 [READ:16.0]
> latency max : 27.5 [READ:27.5]
> Total partitions : 10000000 [READ:10000000]
> Total operation time : 00:10:24
>
> With xxHash:
>
> Results:
> op rate : 17643 [READ:17643]
> latency mean : 5.7 [READ:5.7]
> latency median : 5.0 [READ:5.0]
> latency 95th percentile : 10.0 [READ:10.0]
> latency 99th percentile : 12.0 [READ:12.0]
> latency 99.9th percentile : 14.1 [READ:14.1]
> latency max : 26.5 [READ:26.5]
> Total partitions : 10000000 [READ:10000000]
> Total operation time : 00:09:26
>
>

10% is not a small improvement.

How stable are these results? Was the cpu pegged at 100%?  From the
latency, it looks like things are quite saturated.

17k ops/sec is quite low for a single core.


Duarte Nunes

<duarte@scylladb.com>
unread,
Dec 4, 2017, 7:30:50 AM12/4/17
to Avi Kivity, scylladb-dev@googlegroups.com
Hum.. not sure what's the difference between piece of data or fragment. It will be called, for example, for a cell name,
cell type, cell value (once for timestamp, ttl, value). Looking at the way the series ended up, I should just have
written a digester::hasher/algorithm() function.

Avi Kivity

<avi@scylladb.com>
unread,
Dec 4, 2017, 7:31:04 AM12/4/17
to Duarte Nunes, Paweł Dziepak, scylladb-dev
It works even better with a new file format. Instead of a large blob, store a blob pointer + hash in the btree index.

A hash of a result is the hash of the metadata + blobhash, so you don't need to follow the blob pointer. When compacting, you only need to follow the blob pointer for the latest result. Win+win!

(small blobs would be stored in the btree leaves).




[1] says "xxHash64 wins at larger (0.5KB+) data sizes, followed closely by 64 bit FarmHash and CityHash"

This is exactly what doesn't matter for us. For large blobs we will use precomputed hash so while we need good speed for that as well it is better to have the fastest algorithm for small values and "just" fast one for the big blobs. Another option would be to use different algorithms depending on the value size, but that will require exposing that threshold in the cluster, probably not a big problem though.

The latency hits this aims to solve are due to big values. For low values, latency will be dominated by something else. I don't think it's a good idea to be adding cluster features to define hash thresholds. 

I'd expect large blobs to be I/O and network bound, but perhaps there's a sour spot between small blobs and large blobs where the hash dominates.

To unsubscribe from this group and stop receiving emails from it, send an email to scylladb-dev...@googlegroups.com.

To post to this group, send email to scylla...@googlegroups.com.

Paweł Dziepak

<pdziepak@scylladb.com>
unread,
Dec 4, 2017, 7:32:09 AM12/4/17
to Duarte Nunes, scylladb-dev
I said this wrong. I meant caching hashes of large cells. The problem is right now we just call update() on the hasher for each piece of (meta)data in the result. That won't work if we want to reuse the cached hash. We we should do instead if to hash each cell separately and then feed that hash to the result hasher, so that the final digest is oblivious to the fact whether the cell hashes were cached or not.
 



[1] says "xxHash64 wins at larger (0.5KB+) data sizes, followed closely by 64 bit FarmHash and CityHash"

This is exactly what doesn't matter for us. For large blobs we will use precomputed hash so while we need good speed for that as well it is better to have the fastest algorithm for small values and "just" fast one for the big blobs. Another option would be to use different algorithms depending on the value size, but that will require exposing that threshold in the cluster, probably not a big problem though.

The latency hits this aims to solve are due to big values. For low values, latency will be dominated by something else. I don't think it's a good idea to be adding cluster features to define hash thresholds. 

If the big values are the main motivation of this patchset I'm surprised it doesn't include caching the hash.
We don't need agree on the thresholds at run-time. We just cannot easily change them (same way we cannot easily change the hash algorithm).
 

 

[2] says "So the fastest hash functions on x86_64 without quality problems are:

falkhash (macho64 and elf64 nasm only, with HW AES extension)
t1ha + mum (machine specific, mum: different arch results)
FarmHash (not portable, too machine specific: 64 vs 32bit, old gcc, ...)
Metro (but not 64crc yet, WIP)
Spooky32
xxHash64
fasthash
City (deprecated)"

I don't have much confidence in any of these results because the values of MB/s and cyclces/hash doesn't seem to make any sense (or a proper explaination is missing, either way, I'm reluctant to trust such data). CityHash is "deprecated" in a way that FarmHash is its successor. FarmHash seems to work on other platforms as well though it is not as fast (no idea how big the difference is). Metro looks quite promising as well.
Anyway, these links were probably the places were we should start research, not end it.

Metro hash seems stuck in 2015. When I google, xxHash seemed to have good adoption (supported in projects like OpenHFT). There's SeaHash too, which gets bonus points for being in rust.

What does it mean "stuck in 2015"?
 

Anyway, I don't really have the cycles to embark on a hash function research project.

Well, you already did.

Paweł Dziepak

<pdziepak@scylladb.com>
unread,
Dec 4, 2017, 7:36:23 AM12/4/17
to Duarte Nunes, scylladb-dev
Only if you don't manage to fit the logic in a generic lambda. On the plus side this class won't violate the single responsibility rule and won't have these unsafe switches all over the place.

Duarte Nunes

<duarte@scylladb.com>
unread,
Dec 4, 2017, 7:37:42 AM12/4/17
to Avi Kivity, scylladb-dev@googlegroups.com
Pretty stable, variance was low. Non-patch build runtime was always above 10minutes, patch build always below. I was
running 3 single-threaded Scylla's, for one ssd; I think that's why we see low throughput. I ran c-s on 12 cores, doing
CL.ALL requests (to trigger digest).

I can do a proper benchmark, this was meant to complement the hash algorithm micro-benchmark to show that the other
changes (namely the digester) did no harm.

Avi Kivity

<avi@scylladb.com>
unread,
Dec 4, 2017, 7:38:55 AM12/4/17
to Duarte Nunes, scylladb-dev@googlegroups.com
= piece of data.

Fragment would be, serialize everything up to some size to a blob, then
hash that. I'm not sure what would be faster.  update() probably aligns
and otherwise arranges things, then calls the hash on an integral number
of 128-byte (or whatever) blocks.

Serializing stuff exercises the memory more, but has less control flow
which is nasty for the CPU.


btw, we should also look at replacing murmur3 with xxhash, but that's a
much more significant change.

Duarte Nunes

<duarte@scylladb.com>
unread,
Dec 4, 2017, 7:39:23 AM12/4/17
to Avi Kivity, Paweł Dziepak, scylladb-dev
This I like!





[1] says "xxHash64 wins at larger (0.5KB+) data sizes, followed closely by 64 bit FarmHash and CityHash"

This is exactly what doesn't matter for us. For large blobs we will use precomputed hash so while we need good speed for that as well it is better to have the fastest algorithm for small values and "just" fast one for the big blobs. Another option would be to use different algorithms depending on the value size, but that will require exposing that threshold in the cluster, probably not a big problem though.

The latency hits this aims to solve are due to big values. For low values, latency will be dominated by something else. I don't think it's a good idea to be adding cluster features to define hash thresholds. 

I'd expect large blobs to be I/O and network bound, but perhaps there's a sour spot between small blobs and large blobs where the hash dominates.

They may be cached too. Network latency dominates, but at some point hash latency becomes really significant.

Avi Kivity

<avi@scylladb.com>
unread,
Dec 4, 2017, 7:41:49 AM12/4/17
to Duarte Nunes, scylladb-dev@googlegroups.com
Ah, CL=ALL ~triples the work (default is LOCAL_ONE).

Duarte Nunes

<duarte@scylladb.com>
unread,
Dec 4, 2017, 7:45:42 AM12/4/17
to Paweł Dziepak, scylladb-dev
Ah! I see it now. This should be easy to do, if I'm not missing anything. An aggregator would pass a new algorithm instance to each sub-object, and then merge the results. This is nice too since it could introduce some parallelism to the cpu pipeline, if it could exploit it.


 



[1] says "xxHash64 wins at larger (0.5KB+) data sizes, followed closely by 64 bit FarmHash and CityHash"

This is exactly what doesn't matter for us. For large blobs we will use precomputed hash so while we need good speed for that as well it is better to have the fastest algorithm for small values and "just" fast one for the big blobs. Another option would be to use different algorithms depending on the value size, but that will require exposing that threshold in the cluster, probably not a big problem though.

The latency hits this aims to solve are due to big values. For low values, latency will be dominated by something else. I don't think it's a good idea to be adding cluster features to define hash thresholds. 

If the big values are the main motivation of this patchset I'm surprised it doesn't include caching the hash.
We don't need agree on the thresholds at run-time. We just cannot easily change them (same way we cannot easily change the hash algorithm).

I wanted to cache the value, but I don't have cycles to dig into the row cache right now. You're right though that I didn't think ahead and realize that would also change the hash result and require another feature flag.


 

 

[2] says "So the fastest hash functions on x86_64 without quality problems are:

falkhash (macho64 and elf64 nasm only, with HW AES extension)
t1ha + mum (machine specific, mum: different arch results)
FarmHash (not portable, too machine specific: 64 vs 32bit, old gcc, ...)
Metro (but not 64crc yet, WIP)
Spooky32
xxHash64
fasthash
City (deprecated)"

I don't have much confidence in any of these results because the values of MB/s and cyclces/hash doesn't seem to make any sense (or a proper explaination is missing, either way, I'm reluctant to trust such data). CityHash is "deprecated" in a way that FarmHash is its successor. FarmHash seems to work on other platforms as well though it is not as fast (no idea how big the difference is). Metro looks quite promising as well.
Anyway, these links were probably the places were we should start research, not end it.

Metro hash seems stuck in 2015. When I google, xxHash seemed to have good adoption (supported in projects like OpenHFT). There's SeaHash too, which gets bonus points for being in rust.

What does it mean "stuck in 2015"?

I meant this: https://github.com/jandrewrogers/MetroHash/commits/master

(I guess it might not need updates or bug fixes after 2 years, and that would indeed make it an awesome piece of software).


 

Anyway, I don't really have the cycles to embark on a hash function research project.

Well, you already did.

I meant, a proper one. I didn't even wear a lab coat when running experiments!

Duarte Nunes

<duarte@scylladb.com>
unread,
Dec 4, 2017, 7:47:57 AM12/4/17
to Paweł Dziepak, scylladb-dev
That's true, I'll fix it. Will look better with just one function that returns the algorithm, so we don't always incur the  cost of the switches.

Gleb Natapov

<gleb@scylladb.com>
unread,
Dec 4, 2017, 7:48:17 AM12/4/17
to Duarte Nunes, scylladb-dev@googlegroups.com
On Thu, Nov 30, 2017 at 11:00:23PM +0000, Duarte Nunes wrote:
> Using a normal cassandra-stress run, one can already see a (small)
By normal you mean that you did not change what data c-s sends? By
default each raw c-s uses is pretty small, so I wonder where such a big
improvement is coming from?

--
Gleb.

Duarte Nunes

<duarte@scylladb.com>
unread,
Dec 4, 2017, 7:52:06 AM12/4/17
to Avi Kivity, scylladb-dev@googlegroups.com
I tried, it's an incomplete type. I don't think it's very large though, not sure why this is written in this style.

>
>
>> +
>> +    std::unique_ptr<XXH64_state_t, xxhash_state_deleter> _state;
>> +public:
>> +    xx_hasher() : _state(std::unique_ptr<XXH64_state_t, xxhash_state_deleter>(XXH64_createState())) {
>> +        XXH64_reset(_state.get(), 0);
>> +    }
>> +
>> +    xx_hasher(xx_hasher&&) noexcept = default;
>> +
>> +    xx_hasher& operator=(xx_hasher&&) noexcept = default;
>> +
>> +    xx_hasher(const xx_hasher& other) : xx_hasher() {
>> +        XXH64_copyState(_state.get(), other._state.get());
>> +    }
>> +
>> +    xx_hasher& operator=(const xx_hasher& other) {
>> +        if (this != &other) {
>> +            auto tmp = other;
>> +            this->~xx_hasher();
>> +            new (this) xx_hasher(std::move(tmp));
>> +        }
>
> Pawel ruined this style of assignop for me (even though this one probably doesn't invoke undefined behavior). Let's
> find an alternative idiomatic style. But for now this can stay.

That's #3032 :/

Paweł Dziepak

<pdziepak@scylladb.com>
unread,
Dec 4, 2017, 7:53:11 AM12/4/17
to Duarte Nunes, scylladb-dev
I'm OK with not caching the hashes yet, but I'd feel much more comfortable about this series if we didn't know that the digests needs another incompatible change in the future.

Duarte Nunes

<duarte@scylladb.com>
unread,
Dec 4, 2017, 7:53:11 AM12/4/17
to Avi Kivity, scylladb-dev@googlegroups.com
Still there would be an incompatibility: this one returns an array of 16, xxhash would return one of size 8. So I just
expose it as uint64_t and then put a wrapper on top of that. The thinking is that we might re-use it later for something
where the natural result is a numeral (like for murmur).

>
>
>>       }
>

Duarte Nunes

<duarte@scylladb.com>
unread,
Dec 4, 2017, 7:54:49 AM12/4/17
to Gleb Natapov, scylladb-dev@googlegroups.com
That's right. I think MD5 is really slow (see benchmark in issue) - I was going to try and change the default row and
cell size if I didn't see a difference, but I did, so I didn't pursue it further.

Avi Kivity

<avi@scylladb.com>
unread,
Dec 4, 2017, 7:55:37 AM12/4/17
to Duarte Nunes, scylladb-dev@googlegroups.com
I meant for the double-compile style which I neglected to mention.

Maybe we should standardize on array<uint8_t, 16> (and provide another
method with per-algorithm return type).

Paweł Dziepak

<pdziepak@scylladb.com>
unread,
Dec 4, 2017, 8:03:16 AM12/4/17
to Avi Kivity, Duarte Nunes, scylladb-dev
Copy-assignment is easy if the type is move-assignable and copy-constructible:

xx_hasher& operator=(const xx_hasher& other) {
    return operator=(xx_hasher(other));
}

 
+        return *this;
+    }
+
+    void update(const char* ptr, size_t length) {
+        XXH64_update(_state.get(), ptr, length);
+    }
+
+    uint64_t finalize() {
+        return XXH64_digest(_state.get());
+    }
+};

--
You received this message because you are subscribed to the Google Groups "ScyllaDB development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scylladb-dev+unsubscribe@googlegroups.com.
To post to this group, send email to scylla...@googlegroups.com.

Tomasz Grabiec

<tgrabiec@scylladb.com>
unread,
Dec 4, 2017, 8:04:30 AM12/4/17
to Paweł Dziepak, Duarte Nunes, scylladb-dev
If we change the format of atomic_cell to this:

     <live>  := <int8_t:flags><int64_t:timestamp>(<int32_t:expiry><int32_t:ttl>)?<hash>?<value>

where the presence of the hash is determined by the flags, then we only need to change appending_hash<atomic_cell_view> to pick it up.
 
We want to cache the hash not only in row cache, but in memtables too.

Duarte Nunes

<duarte@scylladb.com>
unread,
Dec 4, 2017, 8:06:51 AM12/4/17
to Avi Kivity, scylladb-dev@googlegroups.com
Sure, I can do that.

Duarte Nunes

<duarte@scylladb.com>
unread,
Dec 4, 2017, 8:14:25 AM12/4/17
to Tomasz Grabiec, Paweł Dziepak, scylladb-dev
This sounds good.

Avi Kivity

<avi@scylladb.com>
unread,
Dec 4, 2017, 8:19:23 AM12/4/17
to Paweł Dziepak, Duarte Nunes, scylladb-dev
Yup!

Avi Kivity

<avi@scylladb.com>
unread,
Dec 4, 2017, 8:23:01 AM12/4/17
to Duarte Nunes, Tomasz Grabiec, Paweł Dziepak, scylladb-dev
Perhaps hash-of-hash should be decoupled from the algorithm used.


Duarte Nunes

<duarte@scylladb.com>
unread,
Dec 4, 2017, 8:53:18 AM12/4/17
to Avi Kivity, Tomasz Grabiec, Paweł Dziepak, scylladb-dev
That would be just XORing, right?

Avi Kivity

<avi@scylladb.com>
unread,
Dec 4, 2017, 8:55:58 AM12/4/17
to Duarte Nunes, Tomasz Grabiec, Paweł Dziepak, scylladb-dev
No, merkle-tree style. We lay out the data, then the hash of the blob, then more data, and hash the whole thing.

(Merkle only hashes other hashes, we combine data and hashes).

xoring is vulnerable to blobs being switched.


Tomasz Grabiec

<tgrabiec@scylladb.com>
unread,
Dec 4, 2017, 9:32:44 AM12/4/17
to Avi Kivity, Duarte Nunes, Paweł Dziepak, scylladb-dev
I don't understand why we need a merkle tree. Why not just always feed the hasher with the hash of the value, instead of the named value like currently?

Tomasz Grabiec

<tgrabiec@scylladb.com>
unread,
Dec 4, 2017, 9:33:06 AM12/4/17
to Avi Kivity, Duarte Nunes, Paweł Dziepak, scylladb-dev
s/named/naked/ 

Duarte Nunes

<duarte@scylladb.com>
unread,
Dec 4, 2017, 9:35:32 AM12/4/17
to Avi Kivity, Tomasz Grabiec, Paweł Dziepak, scylladb-dev
I don't see it then. The flow I have in mind is:

- instantiate a hash algorithm, start feeding it data;
- at the cell level
  - if a given feature flag is disabled, feed the cell value to the algorithm
  - else, if the feature flag is enabled
     - fetch the cached hash, and feed it to the running hash
     - if there's no cached hash
       - instantiate a new algorithm
       - hash the value, feed it to the running hash
       - store it

We would use the same algorithm for everything though.

Avi Kivity

<avi@scylladb.com>
unread,
Dec 5, 2017, 6:49:58 AM12/5/17
to Tomasz Grabiec, Duarte Nunes, Paweł Dziepak, scylladb-dev
That's what a Merkle tree is. A hash of hashes.

Usually a Merkle tree has fixed depth (so the value leaves are all at the same depth), in this case they aren't.

Avi Kivity

<avi@scylladb.com>
unread,
Dec 5, 2017, 6:51:36 AM12/5/17
to Duarte Nunes, Tomasz Grabiec, Paweł Dziepak, scylladb-dev
If you use a good algorithm, there is no vulnerability. If the top level algorithm is a XOR (referring to your N-3 comment), then it is vulnerable.

Duarte Nunes

<duarte@scylladb.com>
unread,
Dec 5, 2017, 7:02:45 AM12/5/17
to Avi Kivity, Tomasz Grabiec, Paweł Dziepak, scylladb-dev
I always meant to use xxhash for everything (work conservation law), but your decoupling comment confused me.

Avi Kivity

<avi@scylladb.com>
unread,
Dec 5, 2017, 7:05:36 AM12/5/17
to Duarte Nunes, Tomasz Grabiec, Paweł Dziepak, scylladb-dev
Sorry. What I meant was that changing the hash algorithm is one thing, and using hashes to represent values is another, and those two things are independent.



Duarte Nunes

<duarte@scylladb.com>
unread,
Jan 4, 2018, 3:16:35 PM1/4/18
to Avi Kivity, Tomasz Grabiec, Paweł Dziepak, scylladb-dev
On 04/12/2017 13:22, Avi Kivity wrote:
So, I'm trying to do this idea of caching the hash in the cell, but there's a layer disconnect: when we know we need the hash, we're already working on copy of the cell. I see two approaches: always calculate the hash on cache/memtable hit, which can translate into a lot of wasted work; or propagate down an argument specifying the upper layer is interested in the hash, which isn't fun.

It's a pity we don't have mutation adaptors that could work on the source bytes :/

Avi Kivity

<avi@scylladb.com>
unread,
Jan 6, 2018, 2:48:44 PM1/6/18
to Duarte Nunes, Tomasz Grabiec, Paweł Dziepak, scylladb-dev
Can't it be part of partition_slice?    That gets propagated everywhere.

Duarte Nunes

<duarte@scylladb.com>
unread,
Jan 7, 2018, 4:43:17 PM1/7/18
to Avi Kivity, Tomasz Grabiec, Paweł Dziepak, scylladb-dev
It can, that's what I ended up doing, but looks like a hack :(

Avi Kivity

<avi@scylladb.com>
unread,
Jan 8, 2018, 6:42:40 AM1/8/18
to Duarte Nunes, Tomasz Grabiec, Paweł Dziepak, scylladb-dev


On 01/07/2018 11:43 PM, Duarte Nunes wrote:
>
>>>>
>>>
>>> So, I'm trying to do this idea of caching the hash in the cell, but
>>> there's a layer disconnect: when we know we need the hash, we're
>>> already working on copy of the cell. I see two approaches: always
>>> calculate the hash on cache/memtable hit, which can translate into a
>>> lot of wasted work; or propagate down an argument specifying the
>>> upper layer is interested in the hash, which isn't fun.
>>>
>>
>> Can't it be part of partition_slice?    That gets propagated everywhere.
>
> It can, that's what I ended up doing, but looks like a hack :(
>

You have not yet learned the art of justifying hacks, I see.

Duarte Nunes

<duarte@scylladb.com>
unread,
Jan 8, 2018, 9:44:53 AM1/8/18
to Avi Kivity, Tomasz Grabiec, Paweł Dziepak, scylladb-dev
I've since taken my medication and from this blissful state, it now looks sublime.

Avi Kivity

<avi@scylladb.com>
unread,
Jan 9, 2018, 3:26:04 AM1/9/18
to Duarte Nunes, Tomasz Grabiec, Paweł Dziepak, scylladb-dev
Remember to share when we meet again.

Tomasz Grabiec

<tgrabiec@scylladb.com>
unread,
Jan 9, 2018, 3:40:02 AM1/9/18
to Duarte Nunes, Avi Kivity, Paweł Dziepak, scylladb-dev
How would such adaptors help? 

Asias He

<asias@scylladb.com>
unread,
Jan 11, 2018, 12:38:12 AM1/11/18
to Duarte Nunes, scylladb-dev
Hello Duarte

Using a normal cassandra-stress run, one can already see a (small)
improvement. With MD5:

Results:
op rate                   : 16007 [READ:16007]
latency mean              : 6.2 [READ:6.2]
latency median            : 5.3 [READ:5.3]
latency 95th percentile   : 11.6 [READ:11.6]
latency 99th percentile   : 13.7 [READ:13.7]
latency 99.9th percentile : 16.0 [READ:16.0]
latency max               : 27.5 [READ:27.5]
Total partitions          : 10000000 [READ:10000000]
Total operation time      : 00:10:24

With xxHash:

Results:
op rate                   : 17643 [READ:17643]
latency mean              : 5.7 [READ:5.7]
latency median            : 5.0 [READ:5.0]
latency 95th percentile   : 10.0 [READ:10.0]
latency 99th percentile   : 12.0 [READ:12.0]
latency 99.9th percentile : 14.1 [READ:14.1]
latency max               : 26.5 [READ:26.5]
Total partitions          : 10000000 [READ:10000000]
Total operation time      : 00:09:26

Fixes #2884


I played a bit with the xxhash in this series. 

I tried two row sizes.

cassandra-stress write n=1000000 cl=ONE -schema 'replication(factor=2)' keyspace=ks1024 -col 'size=FIXED(1024) n=FIXED(4)' -pop seq=1..10000000  -rate threads=200 limit=30000/s -node 127.0.0.1
cassandra-stress write n=100000   cl=ONE -schema 'replication(factor=2)' keyspace=ks4096 -col 'size=FIXED(4096) n=FIXED(4)' -pop seq=1..10000000  -rate threads=200 limit=30000/s -node 127.0.0.1

I see no difference between the xxhash and fnv1a_hasher which is supposed to be slow. 142MiB/s for 1024 size. 134 MiB/s for 4096 size. This includes both reading data from disk and calculating the hash. 

xxhash 1024

INFO  2018-01-11 13:24:25,534 [shard 0] repair - repair 1 on shard 0 stats: round_nr=2004, rpc_call_nr=2018, tx_hashes_nr=0, rx_hashes_nr=0, tx_row_nr=0, rx_row_nr=0, tx_row_bytes=0, rx_row_bytes=0, row_from_disk_bytes={{127.0.0.1, 2099034960}, {127.0.0.2, 2099034960}}, row_from_disk_nr={{127.0.0.1, 474680}, {127.0.0.2, 474680}}, duration=14.08

seconds, row_from_disk_bytes_per_sec={{127.0.0.1, 142.173}, {127.0.0.2, 142.173}} MiB/s, row_from_disk_rows_per_sec={{127.0.0.1, 33713.1}, {127.0.0.2, 33713.1}} Rows/s


fnv1a_hasher 1024

INFO  2018-01-11 13:28:17,192 [shard 0] repair - repair 1 on shard 0 stats: round_nr=2004, rpc_call_nr=2018, tx_hashes_nr=0, rx_hashes_nr=0, tx_row_nr=0, rx_row_nr=0, tx_row_bytes=0, rx_row_bytes=0, row_from_disk_bytes={{127.0.0.1, 2099034960}, {127.0.0.2, 2099034960}}, row_from_disk_nr={{127.0.0.1, 474680}, {127.0.0.2, 474680}}, duration=14.004 seconds, row_from_disk_bytes_per_sec={{127.0.0.1, 142.945}, {127.0.0.2, 142.945}} MiB/s, row_from_disk_rows_per_sec={{127.0.0.1, 33896}, {127.0.0.2, 33896}} Rows/s



xxhash 4096

INFO  2018-01-11 13:25:10,052 [shard 0] repair - repair 2 on shard 0 stats: round_nr=1598, rpc_call_nr=1612, tx_hashes_nr=0, rx_hashes_nr=0, tx_row_nr=0, rx_row_nr=0, tx_row_bytes=0, rx_row_bytes=0, row_from_disk_bytes={{127.0.0.1, 1671000000}, {127.0.0.2, 1671000000}}, row_from_disk_nr={{127.0.0.1, 100000}, {127.0.0.2, 100000}}, duration=11.875 seconds, row_from_disk_bytes_per_sec={{127.0.0.1, 134.197}, {127.0.0.2, 134.197}} MiB/s, row_from_disk_rows_per_sec={{127.0.0.1, 8421.05}, {127.0.0.2, 8421.05}} Rows/s


fnv1a_hasher 4096

INFO  2018-01-11 13:28:59,011 [shard 0] repair - repair 2 on shard 0 stats: round_nr=1598, rpc_call_nr=1612, tx_hashes_nr=0, rx_hashes_nr=0, tx_row_nr=0, rx_row_nr=0, tx_row_bytes=0, rx_row_bytes=0, row_from_disk_bytes={{127.0.0.1, 1671000000}, {127.0.0.2, 1671000000}}, row_from_disk_nr={{127.0.0.1, 100000}, {127.0.0.2, 100000}}, duration=11.829 seconds, row_from_disk_bytes_per_sec={{127.0.0.1, 134.719}, {127.0.0.2, 134.719}} MiB/s, row_from_disk_rows_per_sec={{127.0.0.1, 8453.8}, {127.0.0.2, 8453.8}} Rows/s


 

Also in:
    Also in:
    g...@github.com:duarten/scylla.git  xxhash/v1
    https://github.com/duarten/scylla/tree/xxhash/v1

Sad note: I wish I knew how to configure cassandra-stress to use larger
values.

Duarte Nunes (15):
  Add xxhash (fast non-cryptographic hash) as submodule
  configure.py: Build xxhash
  CMakeLists: Add xxhash directory
  digest: Introduce xxHash hash algorithm
  digest_algorithm: Add xxHash option
  md5_hasher: Extract hash size
  query: Add class to encapsulate digest algorithm
  query-result: Introduce class digester
  query-result: Use digester instead of md5_hasher
  storage_proxy: Extract decision about digest algorithm to use
  message/messaging_service: Specify algorithm when requesting digest
  service/storage_service: Add and use xxhash feature
  schema: Remove unneeded include
  tests/mutation_test: Test xx_hasher alongside md5_hasher
  tests/mutation_test: Use xxHash instead of MD5 for some tests

 configure.py                 |  48 +++++++++---
 database.hh                  |   5 +-
 digest_algorithm.hh          |   5 +-
 digester.hh                  | 173 +++++++++++++++++++++++++++++++++++++++++++
 idl/query.idl.hh             |   3 +-
 md5_hasher.hh                |   8 +-
 message/messaging_service.hh |   4 +-
 mutation.hh                  |   4 +-
 mutation_query.hh            |   2 +-
 query-result-writer.hh       |  17 +++--
 query-result.hh              |  18 ++++-
 service/storage_proxy.hh     |   3 +-
 service/storage_service.hh   |   6 ++
 xx_hasher.hh                 |  63 ++++++++++++++++
 database.cc                  |  16 ++--
 message/messaging_service.cc |   6 +-
 mutation.cc                  |   8 +-
 mutation_partition.cc        |   2 +-
 mutation_query.cc            |   4 +-
 query-result-set.cc          |   2 +-
 schema.cc                    |   1 -
 service/storage_proxy.cc     |  56 +++++++-------
 service/storage_service.cc   |   3 +
 tests/database_test.cc       |   6 +-
 tests/memory_footprint.cc    |   2 +-
 tests/mutation_test.cc       |  44 ++++++-----
 .gitmodules                  |   3 +
 CMakeLists.txt               |   1 +
 xxHash                       |   1 +
 29 files changed, 410 insertions(+), 104 deletions(-)
 create mode 100644 digester.hh
 create mode 100644 xx_hasher.hh
 create mode 160000 xxHash

--
2.15.0


--
You received this message because you are subscribed to the Google Groups "ScyllaDB development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scylladb-dev+unsubscribe@googlegroups.com.
To post to this group, send email to scylla...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Asias

Duarte Nunes

<duarte@scylladb.com>
unread,
Jan 11, 2018, 5:41:00 AM1/11/18
to Asias He, scylladb-dev
How do you trigger digest calculation with cl=ONE?

Asias He

<asias@scylladb.com>
unread,
Jan 11, 2018, 5:55:17 AM1/11/18
to Duarte Nunes, scylladb-dev
The above is used to generate data for repair. It has nothing to do with digest. I am benchmarking the hash speed in repair. 



--
Asias

Duarte Nunes

<duarte@scylladb.com>
unread,
Jan 11, 2018, 6:04:52 AM1/11/18
to Asias He, scylladb-dev
Maybe the throughput is dominated by something else, I didn't really play with repair; you can try to benchmark xx_hash in isolation against fnv1a_hasher.

Duarte Nunes

<duarte@scylladb.com>
unread,
Jan 12, 2018, 9:18:47 AM1/12/18
to Avi Kivity, Tomasz Grabiec, Paweł Dziepak, scylladb-dev
Update: As I'm knee-deep into shoving the hash into a multi-cell, fragmented buffer, which needs to be written byte-by-byte, it dawns on me that this is probably not the best strategy, and I should just store the caches hash in a buffer at the row level (amenable to small buffer optimization). I'll be doing that instead.

Asias He

<asias@scylladb.com>
unread,
Jan 15, 2018, 12:52:13 AM1/15/18
to Duarte Nunes, scylladb-dev
I did another test, which makes the hash does nothing but return a zero. The bandwidth (no hash)  for 1024 size is 183MiB/s for 4096 size is 140MiB/s. xxhash and fnv1a get 142MiB/s for 1024 size, get 134MiB/s for 4096 size. For the 4096 size, the limitation is at the mutation reader. For the 1024 size, the hash operation makes the bandwidth drops around 183 - 142 = 40 MiB/s.  

nohash 1024

INFO  2018-01-12 09:31:59,503 [shard 0] repair - repair 1 on shard 0 stats: round_nr=2004, rpc_call_nr=2018, tx_hashes_nr=0, rx_hashes_nr=0, tx_row_nr=0, rx_row_nr=0, tx_row_bytes=0, rx_row_bytes=0, row_from_disk_bytes={{127.0.0.1, 2099034960}, {127.0.0.2, 2099034960}}, row_from_disk_nr={{127.0.0.1, 474680}, {127.0.0.2, 474680}}, duration=10.935 seconds, row_from_disk_bytes_per_sec={{127.0.0.1, 183.063}, {127.0.0.2, 183.063}} MiB/s, row_from_disk_rows_per_sec={{127.0.0.1, 43409.2}, {127.0.0.2, 43409.2}} Rows/s


nohash 4096

INFO  2018-01-12 09:33:28,596 [shard 0] repair - repair 2 on shard 0 stats: round_nr=1598, rpc_call_nr=1612, tx_hashes_nr=0, rx_hashes_nr=0, tx_row_nr=0, rx_row_nr=0, tx_row_bytes=0, rx_row_bytes=0, row_from_disk_bytes={{127.0.0.1, 1671000000}, {127.0.0.2, 1671000000}}, row_from_disk_nr={{127.0.0.1, 100000}, {127.0.0.2, 100000}}, duration=11.372 seconds, row_from_disk_bytes_per_sec={{127.0.0.1, 140.133}, {127.0.0.2, 140.133}} MiB/s, row_from_disk_rows_per_sec={{127.0.0.1, 8793.53}, {127.0.0.2, 8793.53}} Rows/s



Btw, the md5 hash is 128 bits, in this patch set, 64 bit version of xxhash is used to benchmark. Isn't this an unfair comparison? 



--
Asias

Duarte Nunes

<duarte@scylladb.com>
unread,
Jan 15, 2018, 4:46:46 AM1/15/18
to Asias He, scylladb-dev
Why is that relevant?

Asias He

<asias@scylladb.com>
unread,
Jan 15, 2018, 4:55:57 AM1/15/18
to Duarte Nunes, scylladb-dev
To generate a 1000000 bit hash uses more cpu than 1 bit hash.



--
Asias

Duarte Nunes

<duarte@scylladb.com>
unread,
Jan 15, 2018, 5:34:58 AM1/15/18
to Asias He, scylladb-dev
Depends on register size. Still, it doesn't matter, there's no 128 bit version of xxhash yet.

Duarte Nunes

<duarte@scylladb.com>
unread,
Jan 16, 2018, 9:35:57 AM1/16/18
to scylladb-dev@googlegroups.com
Note: Don't merge yet. Still writting tests and benchmarks. I can no
longer repro the speedup over 100% I got over md5 - not sure why that
is. Now xxhash is only like 50% faster than md5.

The MD5 hash function has proved to be slow for large cell values:

size = 256; elapsed = 4us
size = 512; elapsed = 8us
size = 1024; elapsed = 14us
size = 2048; elapsed = 21us
size = 4096; elapsed = 33us
size = 8192; elapsed = 51us
size = 16384; elapsed = 86us
size = 32768; elapsed = 150us
size = 65536; elapsed = 278us
size = 131072; elapsed = 531us
size = 262144; elapsed = 1032us
size = 524288; elapsed = 2026us
size = 1048576; elapsed = 4004us
size = 2097152; elapsed = 7943us
size = 4194304; elapsed = 15800us
size = 8388608; elapsed = 31731us
size = 16777216; elapsed = 64681us
size = 33554432; elapsed = 130752us
size = 67108864; elapsed = 263154us

The xxHash is a non-cryptographic, 64bit (there's work in progress on
the 128 version) hash that can be used to replace MD5. It performs much
better:

size = 256; elapsed = 2us
size = 512; elapsed = 1us
size = 1024; elapsed = 1us
size = 2048; elapsed = 2us
size = 4096; elapsed = 2us
size = 8192; elapsed = 3us
size = 16384; elapsed = 5us
size = 32768; elapsed = 8us
size = 65536; elapsed = 14us
size = 131072; elapsed = 28us
size = 262144; elapsed = 59us
size = 524288; elapsed = 116us
size = 1048576; elapsed = 226us
size = 2097152; elapsed = 456us
size = 4194304; elapsed = 935us
size = 8388608; elapsed = 1848us
size = 16777216; elapsed = 4723us
size = 33554432; elapsed = 10507us
size = 67108864; elapsed = 21622us

Using a normal cassandra-stress run, one can already see a (small)
improvement. With MD5:

Results:
op rate : 16007 [READ:16007]
latency mean : 6.2 [READ:6.2]
latency median : 5.3 [READ:5.3]
latency 95th percentile : 11.6 [READ:11.6]
latency 99th percentile : 13.7 [READ:13.7]
latency 99.9th percentile : 16.0 [READ:16.0]
latency max : 27.5 [READ:27.5]
Total partitions : 10000000 [READ:10000000]
Total operation time : 00:10:24

With xxHash:

Results:
op rate : 17643 [READ:17643]
latency mean : 5.7 [READ:5.7]
latency median : 5.0 [READ:5.0]
latency 95th percentile : 10.0 [READ:10.0]
latency 99th percentile : 12.0 [READ:12.0]
latency 99.9th percentile : 14.1 [READ:14.1]
latency max : 26.5 [READ:26.5]
Total partitions : 10000000 [READ:10000000]
Total operation time : 00:09:26

Fixes #2884

Also in:
Also in:
g...@github.com:duarten/scylla.git xxhash/v2
https://github.com/duarten/scylla/tree/xxhash/v2

v2:
- Cached cell hash at the row level,
- digester based on variant,
- Less branching on the digest algorithm by way of double dispatch.

Duarte Nunes (25):
Add xxhash (fast non-cryptographic hash) as submodule
configure.py: Build xxhash
CMakeLists: Add xxhash directory
digest: Introduce xxHash hash algorithm
digest_algorithm: Add xxHash option
md5_hasher: Extract hash size
query: Add class to encapsulate digest algorithm
query-result: Introduce class result_options
keys: Replace feed_hash() member function with appending_hash
range_tombstone: Replace feed_hash() member function with
appending_hash
query-result: Use digester instead of md5_hasher
atomic_cell_hash: Add specialization for atomic_cell_or_collection
mutation_partition: Allow caching cell hashes
mutation_partition: Replace hash_row_slice with appending_hash
row: Use cached hash for hash calculation
query::partition_slice: Add option to specify when digest is requested
partition_snapshot_reader: Pre-calculate cell hash
cache_flat_mutation_reader: Pre-calculate cell hash
storage_proxy: Extract decision about digest algorithm to use
message/messaging_service: Specify algorithm when requesting digest
service/storage_service: Add and use xxhash feature
schema: Remove unneeded include
tests/mutation_test: Test xx_hasher alongside md5_hasher
tests/mutation_test: Use xxHash instead of MD5 for some tests
service/storage_proxy: Enable hash caching

configure.py | 48 ++++++++---
atomic_cell_hash.hh | 13 +++
atomic_cell_or_collection.hh | 8 --
cache_flat_mutation_reader.hh | 10 ++-
database.hh | 5 +-
digest_algorithm.hh | 3 +-
digester.hh | 70 +++++++++++++++++
hashing_partition_visitor.hh | 4 +-
idl/query.idl.hh | 3 +-
keys.hh | 47 ++++++++---
md5_hasher.hh | 8 +-
message/messaging_service.hh | 4 +-
mutation.hh | 6 +-
mutation_partition.hh | 85 ++++++++++++++++----
mutation_query.hh | 2 +-
partition_snapshot_reader.hh | 21 +++--
partition_snapshot_row_cursor.hh | 10 ++-
partition_version.hh | 2 +-
position_in_partition.hh | 2 +-
query-request.hh | 5 +-
query-result-writer.hh | 19 ++---
query-result.hh | 18 ++++-
range_tombstone.hh | 28 ++++---
read_context.hh | 1 +
service/storage_proxy.hh | 3 +-
service/storage_service.hh | 6 ++
xx_hasher.hh | 82 +++++++++++++++++++
database.cc | 16 ++--
memtable.cc | 5 +-
message/messaging_service.cc | 6 +-
mutation.cc | 8 +-
mutation_partition.cc | 166 ++++++++++++++++++++++++++-------------
mutation_query.cc | 4 +-
partition_version.cc | 9 ++-
query-result-set.cc | 2 +-
repair/repair.cc | 11 +--
schema.cc | 1 -
service/storage_proxy.cc | 58 +++++++-------
service/storage_service.cc | 3 +
tests/database_test.cc | 6 +-
tests/memory_footprint.cc | 2 +-
tests/mutation_test.cc | 44 ++++++-----
tests/mvcc_test.cc | 2 +-
.gitmodules | 3 +
CMakeLists.txt | 1 +
licenses/xxhash-license.txt | 24 ++++++
xxHash | 1 +
47 files changed, 651 insertions(+), 234 deletions(-)
create mode 100644 digester.hh
create mode 100644 xx_hasher.hh
create mode 100644 licenses/xxhash-license.txt
create mode 160000 xxHash

--
2.15.1

Duarte Nunes

<duarte@scylladb.com>
unread,
Jan 16, 2018, 9:35:58 AM1/16/18
to scylladb-dev@googlegroups.com
Signed-off-by: Duarte Nunes <dua...@scylladb.com>

Note:
xxhash repo should be cloned to Scylla organization, and that
git url should be used instead.
---
.gitmodules | 3 +++
licenses/xxhash-license.txt | 24 ++++++++++++++++++++++++
xxHash | 1 +
3 files changed, 28 insertions(+)
create mode 100644 licenses/xxhash-license.txt
create mode 160000 xxHash

diff --git a/.gitmodules b/.gitmodules
index 18e14ee560..6af0a36401 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -9,3 +9,6 @@
[submodule "dist/ami/files/scylla-ami"]
path = dist/ami/files/scylla-ami
url = ../scylla-ami
+[submodule "xxHash"]
+ path = xxHash
+ url = g...@github.com:Cyan4973/xxHash.git
diff --git a/licenses/xxhash-license.txt b/licenses/xxhash-license.txt
new file mode 100644
index 0000000000..7de801ed1b
--- /dev/null
+++ b/licenses/xxhash-license.txt
@@ -0,0 +1,24 @@
+xxHash Library
+Copyright (c) 2012-2014, Yann Collet
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+* Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+* Redistributions in binary form must reproduce the above copyright notice, this
+ list of conditions and the following disclaimer in the documentation and/or
+ other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/xxHash b/xxHash
new file mode 160000
index 0000000000..744892b802
--- /dev/null
+++ b/xxHash
@@ -0,0 +1 @@
+Subproject commit 744892b802dcf61a78a3f2f1311d542577c16d66
--
2.15.1

Duarte Nunes

<duarte@scylladb.com>
unread,
Jan 16, 2018, 9:36:00 AM1/16/18
to scylladb-dev@googlegroups.com
Signed-off-by: Duarte Nunes <dua...@scylladb.com>
---
configure.py | 48 +++++++++++++++++++++++++++++++++++++++---------
1 file changed, 39 insertions(+), 9 deletions(-)

diff --git a/configure.py b/configure.py
index 2bd0f2e76e..e70124cae5 100755
--- a/configure.py
+++ b/configure.py
@@ -27,6 +27,8 @@ tempfile.tempdir = "./build/tmp"

configure_args = str.join(' ', [shlex.quote(x) for x in sys.argv[1:]])

+srcdir = os.getcwd()
+
for line in open('/etc/os-release'):
key, _, value = line.partition('=')
value = value.strip().strip('"')
@@ -188,12 +190,14 @@ modes = {
'sanitize_libs': '-lasan -lubsan',
'opt': '-O0 -DDEBUG -DDEBUG_SHARED_PTR -DDEFAULT_ALLOCATOR',
'libs': '',
+ 'xxhash_opts': '-DBUILD_SHARED_LIBS=OFF -DBUILD_XXHSUM=ON',
},
'release': {
'sanitize': '',
'sanitize_libs': '',
'opt': '-O2',
'libs': '',
+ 'xxhash_opts': '-DBUILD_SHARED_LIBS=OFF -DBUILD_XXHSUM=ON',
},
}

@@ -905,6 +909,22 @@ libs = ' '.join(['-lyaml-cpp', '-llz4', '-lz', '-lsnappy', pkg_config("--libs",
maybe_static(args.staticboost, '-lboost_date_time'),
])

+# xxhash lib
+xxhash_dir = 'xxHash'
+xxhash_lib = 'xxhash-scylla'
+xxhash_src_lib = xxhash_dir + '/libxxhash.a'
+
+if not os.path.exists(xxhash_dir) or not os.listdir(xxhash_dir):
+ raise Exception(xxhash_dir + ' is empty. Run "git submodule update --init".')
+
+xxhash_sources = []
+for root, dirs, files in os.walk(xxhash_dir):
+ xxhash_sources += [os.path.join(root, file)
+ for file in files
+ if file.endswith('.h') or file.endswith('.c')]
+xxhash_sources = ' '.join(xxhash_sources)
+libs += ' -l' + xxhash_lib
+
if not args.staticboost:
args.user_cflags += ' -DBOOST_TEST_DYN_LINK'

@@ -963,17 +983,17 @@ with open(buildfile, 'w') as f:
for mode in build_modes:
modeval = modes[mode]
f.write(textwrap.dedent('''\
- cxxflags_{mode} = -I. -I $builddir/{mode}/gen -I seastar -I seastar/build/{mode}/gen
+ cxxflags_{mode} = -I. -I $builddir/{mode}/gen -I seastar -I seastar/build/{mode}/gen -I$full_builddir/{mode}/xxhash
rule cxx.{mode}
command = $cxx -MD -MT $out -MF $out.d {seastar_cflags} $cxxflags $cxxflags_{mode} $obj_cxxflags -c -o $out $in
description = CXX $out
depfile = $out.d
rule link.{mode}
- command = $cxx $cxxflags_{mode} {sanitize_libs} $ldflags {seastar_libs} -o $out $in $libs $libs_{mode}
+ command = $cxx $cxxflags_{mode} {sanitize_libs} -L$builddir/{mode} $ldflags {seastar_libs} -o $out $in $libs $libs_{mode}
description = LINK $out
pool = link_pool
rule link_stripped.{mode}
- command = $cxx $cxxflags_{mode} -s {sanitize_libs} $ldflags {seastar_libs} -o $out $in $libs $libs_{mode}
+ command = $cxx $cxxflags_{mode} -s {sanitize_libs} -L$builddir/{mode} $ldflags {seastar_libs} -o $out $in $libs $libs_{mode}
description = LINK (stripped) $out
pool = link_pool
rule ar.{mode}
@@ -996,6 +1016,15 @@ with open(buildfile, 'w') as f:
build/{mode}/gen/${{stem}}Parser.cpp
description = ANTLR3 $in
''').format(mode = mode, **modeval))
+ f.write(textwrap.dedent('''\
+ rule xxhashmake_{mode}
+ command = make -C $builddir/{mode}/{xxhash_dir} CC={args.cc}
+ rule xxhashcmake_{mode}
+ command = mkdir -p $builddir/{mode}/{xxhash_dir} && cd $builddir/{mode}/{xxhash_dir} && CC={args.cc} cmake {xxhash_opts} {srcdir}/$in/cmake_unofficial
+ build $builddir/{mode}/{xxhash_dir}/Makefile : xxhashcmake_{mode} {xxhash_dir}
+ build $builddir/{mode}/{xxhash_src_lib} : xxhashmake_{mode} $builddir/{mode}/{xxhash_dir}/Makefile | {xxhash_sources}
+ build $builddir/{mode}/lib{xxhash_lib}.a : copy $builddir/{mode}/{xxhash_src_lib}
+ ''').format(xxhash_opts=(modeval['xxhash_opts']), **globals()))
f.write('build {mode}: phony {artifacts}\n'.format(mode = mode,
artifacts = str.join(' ', ('$builddir/' + mode + '/' + x for x in build_artifacts))))
compiles = {}
@@ -1022,6 +1051,10 @@ with open(buildfile, 'w') as f:
if binary.endswith('.a'):
f.write('build $builddir/{}/{}: ar.{} {}\n'.format(mode, binary, mode, str.join(' ', objs)))
else:
+ libdeps = str.join(' ', [
+ 'seastar/build/{mode}/libseastar.a'.format(mode = mode),
+ '$builddir/{mode}/lib{xxhash_lib}.a'.format(**globals()),
+ ])
if binary.startswith('tests/'):
local_libs = '$libs'
if binary not in tests_not_using_seastar_test_framework or binary in pure_boost_tests:
@@ -1033,15 +1066,12 @@ with open(buildfile, 'w') as f:
# So we strip the tests by default; The user can very
# quickly re-link the test unstripped by adding a "_g"
# to the test name, e.g., "ninja build/release/testname_g"
- f.write('build $builddir/{}/{}: {}.{} {} {}\n'.format(mode, binary, tests_link_rule, mode, str.join(' ', objs),
- 'seastar/build/{}/libseastar.a'.format(mode)))
+ f.write('build $builddir/{}/{}: {}.{} {} | {}\n'.format(mode, binary, tests_link_rule, mode, str.join(' ', objs), libdeps))
f.write(' libs = {}\n'.format(local_libs))
- f.write('build $builddir/{}/{}_g: link.{} {} {}\n'.format(mode, binary, mode, str.join(' ', objs),
- 'seastar/build/{}/libseastar.a'.format(mode)))
+ f.write('build $builddir/{}/{}_g: link.{} {} | {}\n'.format(mode, binary, mode, str.join(' ', objs), libdeps))
f.write(' libs = {}\n'.format(local_libs))
else:
- f.write('build $builddir/{}/{}: link.{} {} {}\n'.format(mode, binary, mode, str.join(' ', objs),
- 'seastar/build/{}/libseastar.a'.format(mode)))
+ f.write('build $builddir/{}/{}: link.{} {} | {}\n'.format(mode, binary, mode, str.join(' ', objs), libdeps))
if has_thrift:
f.write(' libs = {} {} $libs\n'.format(thrift_libs, maybe_static(args.staticboost, '-lboost_system')))
for src in srcs:
--
2.15.1

Duarte Nunes

<duarte@scylladb.com>
unread,
Jan 16, 2018, 9:36:01 AM1/16/18
to scylladb-dev@googlegroups.com
Signed-off-by: Duarte Nunes <dua...@scylladb.com>
---
CMakeLists.txt | 1 +
1 file changed, 1 insertion(+)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index d4c96877b7..d4c649675b 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -137,4 +137,5 @@ target_include_directories(scylla PUBLIC
${SEASTAR_DPDK_INCLUDE_DIRS}
${SEASTAR_INCLUDE_DIRS}
${Boost_INCLUDE_DIRS}
+ xxhash
build/release/gen)
--
2.15.1

Duarte Nunes

<duarte@scylladb.com>
unread,
Jan 16, 2018, 9:36:01 AM1/16/18
to scylladb-dev@googlegroups.com
This patch introduces xx_hasher, a class conforming to the Hasher
concept, which will be used to calculate the data digest in subsequent
patches. It is expected to be an order of magnitude faster than md5.

We use the 64 bit variant of the algorithm, the 128 bit one still
being under development.

Signed-off-by: Duarte Nunes <dua...@scylladb.com>
---
xx_hasher.hh | 82 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 82 insertions(+)
create mode 100644 xx_hasher.hh

diff --git a/xx_hasher.hh b/xx_hasher.hh
new file mode 100644
index 0000000000..1d09ee057f
--- /dev/null
+++ b/xx_hasher.hh
@@ -0,0 +1,82 @@
+/*
+ * Copyright (C) 2018 ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "bytes.hh"
+#include "utils/serialization.hh"
+
+#include <xxHash/xxhash.h>
+#include <memory>
+
+class xx_hasher {
+ static constexpr size_t digest_size = 16;
+
+ struct xxhash_state_deleter {
+ void operator()(XXH64_state_t* x) const { XXH64_freeState(x); }
+ };
+
+ std::unique_ptr<XXH64_state_t, xxhash_state_deleter> _state;
+
+public:
+ xx_hasher() : _state(std::unique_ptr<XXH64_state_t, xxhash_state_deleter>(XXH64_createState())) {
+ XXH64_reset(_state.get(), 0);
+ }
+
+ xx_hasher(xx_hasher&&) noexcept = default;
+
+ xx_hasher& operator=(xx_hasher&&) noexcept = default;
+
+ xx_hasher(const xx_hasher& other) : xx_hasher() {
+ XXH64_copyState(_state.get(), other._state.get());
+ }
+
+ xx_hasher& operator=(const xx_hasher& other) {
+ return operator=(xx_hasher(other));
+ }
+
+ void update(const char* ptr, size_t length) {
+ XXH64_update(_state.get(), ptr, length);
+ }
+
+ bytes finalize() {
+ bytes digest{bytes::initialized_later(), digest_size};
+ serialize_to(digest.begin());
+ return digest;
+ }
+
+ std::array<uint8_t, 16> finalize_array() {
+ std::array<uint8_t, 16> digest;
+ serialize_to(digest.begin());
+ return digest;
+ };
+
+ uint64_t finalize_uint64() {
+ return XXH64_digest(_state.get());
+ }
+
+private:
+ template<typename OutIterator>
+ void serialize_to(OutIterator&& out) {
+ serialize_int64(out, 0);
+ serialize_int64(out, finalize_uint64());
+ }
+};
--
2.15.1

Duarte Nunes

<duarte@scylladb.com>
unread,
Jan 16, 2018, 9:36:02 AM1/16/18
to scylladb-dev@googlegroups.com
Signed-off-by: Duarte Nunes <dua...@scylladb.com>
---
digest_algorithm.hh | 1 +
service/storage_proxy.cc | 2 +-
2 files changed, 2 insertions(+), 1 deletion(-)

diff --git a/digest_algorithm.hh b/digest_algorithm.hh
index 966f18da43..26e31b0c11 100644
--- a/digest_algorithm.hh
+++ b/digest_algorithm.hh
@@ -26,6 +26,7 @@ namespace query {
enum class digest_algorithm : uint8_t {
none = 0, // digest not required
MD5 = 1, // default algorithm
+ xxHash = 2,
};

}
diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc
index 498ca12b44..4b6d5b4e6c 100644
--- a/service/storage_proxy.cc
+++ b/service/storage_proxy.cc
@@ -3816,7 +3816,7 @@ void storage_proxy::init_messaging_service() {
case query::digest_algorithm::none:
qrr = query::result_request::only_result;
break;
- case query::digest_algorithm::MD5:
+ default:
qrr = query::result_request::result_and_digest;
break;
}
--
2.15.1

Duarte Nunes

<duarte@scylladb.com>
unread,
Jan 16, 2018, 9:36:04 AM1/16/18
to scylladb-dev@googlegroups.com
Signed-off-by: Duarte Nunes <dua...@scylladb.com>
---
md5_hasher.hh | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/md5_hasher.hh b/md5_hasher.hh
index 0de64e5b33..379aff14ac 100644
--- a/md5_hasher.hh
+++ b/md5_hasher.hh
@@ -30,19 +30,21 @@
class md5_hasher {
CryptoPP::Weak::MD5 hash{};
public:
+ static constexpr size_t size = CryptoPP::Weak::MD5::DIGESTSIZE;
+
void update(const char* ptr, size_t length) {
static_assert(sizeof(char) == sizeof(byte), "Assuming lengths will be the same");
hash.Update(reinterpret_cast<const byte*>(ptr), length * sizeof(byte));
}

bytes finalize() {
- bytes digest{bytes::initialized_later(), CryptoPP::Weak::MD5::DIGESTSIZE};
+ bytes digest{bytes::initialized_later(), size};
hash.Final(reinterpret_cast<unsigned char*>(digest.begin()));
return digest;
}

- std::array<uint8_t, CryptoPP::Weak::MD5::DIGESTSIZE> finalize_array() {
- std::array<uint8_t, CryptoPP::Weak::MD5::DIGESTSIZE> array;
+ std::array<uint8_t, size> finalize_array() {
+ std::array<uint8_t, size> array;
hash.Final(reinterpret_cast<unsigned char*>(array.data()));
return array;
}
--
2.15.1

Duarte Nunes

<duarte@scylladb.com>
unread,
Jan 16, 2018, 9:36:05 AM1/16/18
to scylladb-dev@googlegroups.com
This patch paves the way for us to encapsulate the actual digest
algorithm used for a query. The digester class dispatches to a
concrete implementation based on the digest algorithm being used. It
wraps the xxHash algorithm to provide a 128 bit hash, which is the
size of digest expected by the inter-node protocol.

Signed-off-by: Duarte Nunes <dua...@scylladb.com>
---
digester.hh | 70 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 70 insertions(+)
create mode 100644 digester.hh

diff --git a/digester.hh b/digester.hh
new file mode 100644
index 0000000000..59bd5ad07b
--- /dev/null
+++ b/digester.hh
@@ -0,0 +1,70 @@
+/*
+ * Copyright (C) 2018 ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "digest_algorithm.hh"
+#include "md5_hasher.hh"
+#include "xx_hasher.hh"
+
+#include <stdexcept>
+#include <variant>
+
+namespace query {
+
+class digester final {
+ struct noop_hasher {
+ void update(const char* ptr, size_t length) { }
+ std::array<uint8_t, 16> finalize_array() { return std::array<uint8_t, 16>(); };
+ };
+
+ std::variant<noop_hasher, md5_hasher, xx_hasher> _impl;
+
+public:
+ explicit digester(digest_algorithm algo) {
+ switch (algo) {
+ case digest_algorithm::MD5:
+ _impl = md5_hasher();
+ break;
+ case digest_algorithm::xxHash:
+ _impl = xx_hasher();
+ break;
+ case digest_algorithm ::none:
+ _impl = noop_hasher();
+ break;
+ }
+ }
+
+ template<typename T, typename... Args>
+ void feed_hash(const T& value, Args&&... args) {
+ std::visit([&] (auto& hasher) {
+ ::feed_hash(hasher, value, std::forward<Args>(args)...);
+ }, _impl);
+ };
+
+ std::array<uint8_t, 16> finalize_array() {
+ return std::visit([&] (auto& hasher) {
+ return hasher.finalize_array();
+ }, _impl);
+ }
+};
+
+}
\ No newline at end of file
--
2.15.1

Duarte Nunes

<duarte@scylladb.com>
unread,
Jan 16, 2018, 9:36:08 AM1/16/18
to scylladb-dev@googlegroups.com
Introduce class result_options to carry result options through the
request pipeline, which at this point mean the result type and the
digest algorithm. This class allows us to encapsulate the concrete
digest algorithm to use.

Signed-off-by: Duarte Nunes <dua...@scylladb.com>
---
database.hh | 5 +++--
mutation.hh | 4 ++--
mutation_query.hh | 2 +-
query-result-writer.hh | 4 ++--
query-result.hh | 14 ++++++++++++++
service/storage_proxy.hh | 2 +-
database.cc | 16 ++++++++--------
mutation.cc | 8 ++++----
mutation_query.cc | 4 ++--
query-result-set.cc | 2 +-
service/storage_proxy.cc | 36 +++++++++++++++---------------------
tests/database_test.cc | 6 +++---
tests/memory_footprint.cc | 2 +-
tests/mutation_test.cc | 7 ++++---
14 files changed, 61 insertions(+), 51 deletions(-)

diff --git a/database.hh b/database.hh
index aadee14d8c..07c13323c2 100644
--- a/database.hh
+++ b/database.hh
@@ -637,7 +637,8 @@ class column_family : public enable_lw_shared_from_this<column_family> {

// Returns at most "cmd.limit" rows
future<lw_shared_ptr<query::result>> query(schema_ptr,
- const query::read_command& cmd, query::result_request request,
+ const query::read_command& cmd,
+ query::result_options opts,
const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state,
query::result_memory_limiter& memory_limiter,
@@ -1185,7 +1186,7 @@ class database {
unsigned shard_of(const dht::token& t);
unsigned shard_of(const mutation& m);
unsigned shard_of(const frozen_mutation& m);
- future<lw_shared_ptr<query::result>, cache_temperature> query(schema_ptr, const query::read_command& cmd, query::result_request request,
+ future<lw_shared_ptr<query::result>, cache_temperature> query(schema_ptr, const query::read_command& cmd, query::result_options opts,
const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state,
uint64_t max_result_size, db::timeout_clock::time_point timeout = db::no_timeout);
future<reconcilable_result, cache_temperature> query_mutations(schema_ptr, const query::read_command& cmd, const dht::partition_range& range,
diff --git a/mutation.hh b/mutation.hh
index b94ab6b87d..9894dda204 100644
--- a/mutation.hh
+++ b/mutation.hh
@@ -108,14 +108,14 @@ class mutation final {
public:
// The supplied partition_slice must be governed by this mutation's schema
query::result query(const query::partition_slice&,
- query::result_request request = query::result_request::only_result,
+ query::result_options opts = query::result_options::only_result(),
gc_clock::time_point now = gc_clock::now(),
uint32_t row_limit = query::max_rows) &&;

// The supplied partition_slice must be governed by this mutation's schema
// FIXME: Slower than the r-value version
query::result query(const query::partition_slice&,
- query::result_request request = query::result_request::only_result,
+ query::result_options opts = query::result_options::only_result(),
gc_clock::time_point now = gc_clock::now(),
uint32_t row_limit = query::max_rows) const&;

diff --git a/mutation_query.hh b/mutation_query.hh
index ee29d5248f..baf9cbcfd2 100644
--- a/mutation_query.hh
+++ b/mutation_query.hh
@@ -106,7 +106,7 @@ class reconcilable_result {
printer pretty_printer(schema_ptr) const;
};

-query::result to_data_query_result(const reconcilable_result&, schema_ptr, const query::partition_slice&, uint32_t row_limit, uint32_t partition_limit, query::result_request result_type = query::result_request::only_result);
+query::result to_data_query_result(const reconcilable_result&, schema_ptr, const query::partition_slice&, uint32_t row_limit, uint32_t partition_limit, query::result_options opts = query::result_options::only_result());

// Performs a query on given data source returning data in reconcilable form.
//
diff --git a/query-result-writer.hh b/query-result-writer.hh
index 664bf377e2..afaed6de0e 100644
--- a/query-result-writer.hh
+++ b/query-result-writer.hh
@@ -131,10 +131,10 @@ class result::builder {
short_read _short_read;
result_memory_accounter _memory_accounter;
public:
- builder(const partition_slice& slice, result_request request, result_memory_accounter memory_accounter)
+ builder(const partition_slice& slice, result_options options, result_memory_accounter memory_accounter)
: _slice(slice)
, _w(ser::writer_of_query_result<bytes_ostream>(_out).start_partitions())
- , _request(request)
+ , _request(options.request)
, _memory_accounter(std::move(memory_accounter))
{ }
builder(builder&&) = delete; // _out is captured by reference
diff --git a/query-result.hh b/query-result.hh
index 4e4681e86c..21270f4e35 100644
--- a/query-result.hh
+++ b/query-result.hh
@@ -24,6 +24,7 @@
#define CRYPTOPP_ENABLE_NAMESPACE_WEAK 1
#include <cryptopp/md5.h>
#include "bytes_ostream.hh"
+#include "digest_algorithm.hh"
#include "query-request.hh"
#include "md5_hasher.hh"
#include <experimental/optional>
@@ -268,6 +269,19 @@ enum class result_request {
result_and_digest,
};

+struct result_options {
+ result_request request = result_request::only_result;
+ digest_algorithm digest_algo = query::digest_algorithm::none;
+
+ static result_options only_result() {
+ return result_options{};
+ }
+
+ static result_options only_digest(digest_algorithm da) {
+ return {result_request::only_digest, da};
+ }
+};
+
class result_digest {
public:
static_assert(16 == CryptoPP::Weak::MD5::DIGESTSIZE, "MD5 digest size is all wrong");
diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh
index c5bbf915e4..0fc9a5fb0c 100644
--- a/service/storage_proxy.hh
+++ b/service/storage_proxy.hh
@@ -253,7 +253,7 @@ class storage_proxy : public seastar::async_sharded_service<storage_proxy> /*imp
db::read_repair_decision new_read_repair_decision(const schema& s);
::shared_ptr<abstract_read_executor> get_read_executor(lw_shared_ptr<query::read_command> cmd, dht::partition_range pr, db::consistency_level cl, tracing::trace_state_ptr trace_state);
future<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature> query_result_local(schema_ptr, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr,
- query::result_request request,
+ query::result_options opts,
tracing::trace_state_ptr trace_state,
clock_type::time_point timeout,
uint64_t max_size = query::result_memory_limiter::maximum_result_size);
diff --git a/database.cc b/database.cc
index 41978b35ad..2dedb8ff5b 100644
--- a/database.cc
+++ b/database.cc
@@ -2911,12 +2911,12 @@ compare_atomic_cell_for_merge(atomic_cell_view left, atomic_cell_view right) {
struct query_state {
explicit query_state(schema_ptr s,
const query::read_command& cmd,
- query::result_request request,
+ query::result_options opts,
const dht::partition_range_vector& ranges,
query::result_memory_accounter memory_accounter = { })
: schema(std::move(s))
, cmd(cmd)
- , builder(cmd.slice, request, std::move(memory_accounter))
+ , builder(cmd.slice, opts, std::move(memory_accounter))
, limit(cmd.row_limit)
, partition_limit(cmd.partition_limit)
, current_partition_range(ranges.begin())
@@ -2943,16 +2943,16 @@ struct query_state {
};

future<lw_shared_ptr<query::result>>
-column_family::query(schema_ptr s, const query::read_command& cmd, query::result_request request,
+column_family::query(schema_ptr s, const query::read_command& cmd, query::result_options opts,
const dht::partition_range_vector& partition_ranges,
tracing::trace_state_ptr trace_state, query::result_memory_limiter& memory_limiter,
uint64_t max_size, db::timeout_clock::time_point timeout) {
utils::latency_counter lc;
_stats.reads.set_latency(lc);
- auto f = request == query::result_request::only_digest
+ auto f = opts.request == query::result_request::only_digest
? memory_limiter.new_digest_read(max_size) : memory_limiter.new_data_read(max_size);
- return f.then([this, lc, s = std::move(s), &cmd, request, &partition_ranges, trace_state = std::move(trace_state), timeout] (query::result_memory_accounter accounter) mutable {
- auto qs_ptr = std::make_unique<query_state>(std::move(s), cmd, request, partition_ranges, std::move(accounter));
+ return f.then([this, lc, s = std::move(s), &cmd, opts, &partition_ranges, trace_state = std::move(trace_state), timeout] (query::result_memory_accounter accounter) mutable {
+ auto qs_ptr = std::make_unique<query_state>(std::move(s), cmd, opts, partition_ranges, std::move(accounter));
auto& qs = *qs_ptr;
return do_until(std::bind(&query_state::done, &qs), [this, &qs, trace_state = std::move(trace_state), timeout] {
auto&& range = *qs.current_partition_range++;
@@ -3001,10 +3001,10 @@ std::chrono::milliseconds column_family::get_coordinator_read_latency_percentile
static thread_local auto data_query_stage = seastar::make_execution_stage("data_query", &column_family::query);

future<lw_shared_ptr<query::result>, cache_temperature>
-database::query(schema_ptr s, const query::read_command& cmd, query::result_request request, const dht::partition_range_vector& ranges,
+database::query(schema_ptr s, const query::read_command& cmd, query::result_options opts, const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state, uint64_t max_result_size, db::timeout_clock::time_point timeout) {
column_family& cf = find_column_family(cmd.cf_id);
- return data_query_stage(&cf, std::move(s), seastar::cref(cmd), request, seastar::cref(ranges),
+ return data_query_stage(&cf, std::move(s), seastar::cref(cmd), opts, seastar::cref(ranges),
std::move(trace_state), seastar::ref(get_result_memory_limiter()),
max_result_size,
timeout).then_wrapped([this, s = _stats, hit_rate = cf.get_global_cache_hit_rate()] (auto f) {
diff --git a/mutation.cc b/mutation.cc
index 7a457278c3..4fb3c62594 100644
--- a/mutation.cc
+++ b/mutation.cc
@@ -120,20 +120,20 @@ mutation::query(query::result::builder& builder,

query::result
mutation::query(const query::partition_slice& slice,
- query::result_request request,
+ query::result_options opts,
gc_clock::time_point now, uint32_t row_limit) &&
{
- query::result::builder builder(slice, request, { });
+ query::result::builder builder(slice, opts, { });
std::move(*this).query(builder, slice, now, row_limit);
return builder.build();
}

query::result
mutation::query(const query::partition_slice& slice,
- query::result_request request,
+ query::result_options opts,
gc_clock::time_point now, uint32_t row_limit) const&
{
- return mutation(*this).query(slice, request, now, row_limit);
+ return mutation(*this).query(slice, opts, now, row_limit);
}

size_t
diff --git a/mutation_query.cc b/mutation_query.cc
index b501f9705c..0dfc128802 100644
--- a/mutation_query.cc
+++ b/mutation_query.cc
@@ -57,8 +57,8 @@ bool reconcilable_result::operator!=(const reconcilable_result& other) const {
}

query::result
-to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::partition_slice& slice, uint32_t max_rows, uint32_t max_partitions, query::result_request result_type) {
- query::result::builder builder(slice, result_type, { });
+to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::partition_slice& slice, uint32_t max_rows, uint32_t max_partitions, query::result_options opts) {
+ query::result::builder builder(slice, opts, { });
for (const partition& p : r.partitions()) {
if (builder.row_count() >= max_rows || builder.partition_count() >= max_partitions) {
break;
diff --git a/query-result-set.cc b/query-result-set.cc
index f849230d6d..2acca12cc0 100644
--- a/query-result-set.cc
+++ b/query-result-set.cc
@@ -193,7 +193,7 @@ result_set::from_raw_result(schema_ptr s, const partition_slice& slice, const re

result_set::result_set(const mutation& m) : result_set([&m] {
auto slice = partition_slice_builder(*m.schema()).build();
- auto qr = mutation(m).query(slice, result_request::only_result);
+ auto qr = mutation(m).query(slice, result_options::only_result());
return result_set::from_raw_result(m.schema(), slice, qr);
}())
{ }
diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc
index 4b6d5b4e6c..734d815dd1 100644
--- a/service/storage_proxy.cc
+++ b/service/storage_proxy.cc
@@ -2503,15 +2503,16 @@ class abstract_read_executor : public enable_shared_from_this<abstract_read_exec
}
future<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature> make_data_request(gms::inet_address ep, clock_type::time_point timeout, bool want_digest) {
++_proxy->_stats.data_read_attempts.get_ep_stat(ep);
+ auto opts = want_digest
+ ? query::result_options{query::result_request::result_and_digest, query::digest_algorithm::MD5}
+ : query::result_options{query::result_request::only_result, query::digest_algorithm::none};
if (fbu::is_me(ep)) {
tracing::trace(_trace_state, "read_data: querying locally");
- auto qrr = want_digest ? query::result_request::result_and_digest : query::result_request::only_result;
- return _proxy->query_result_local(_schema, _cmd, _partition_range, qrr, _trace_state, timeout);
+ return _proxy->query_result_local(_schema, _cmd, _partition_range, opts, _trace_state, timeout);
} else {
auto& ms = netw::get_local_messaging_service();
tracing::trace(_trace_state, "read_data: sending a message to /{}", ep);
- auto da = want_digest ? query::digest_algorithm::MD5 : query::digest_algorithm::none;
- return ms.send_read_data(netw::messaging_service::msg_addr{ep, 0}, timeout, *_cmd, _partition_range, da).then([this, ep](query::result&& result, rpc::optional<cache_temperature> hit_rate) {
+ return ms.send_read_data(netw::messaging_service::msg_addr{ep, 0}, timeout, *_cmd, _partition_range, opts.digest_algo).then([this, ep](query::result&& result, rpc::optional<cache_temperature> hit_rate) {
tracing::trace(_trace_state, "read_data: got response from /{}", ep);
return make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>(make_foreign(::make_lw_shared<query::result>(std::move(result))), hit_rate.value_or(cache_temperature::invalid()));
});
@@ -2906,27 +2907,27 @@ ::shared_ptr<abstract_read_executor> storage_proxy::get_read_executor(lw_shared_

future<query::result_digest, api::timestamp_type, cache_temperature>
storage_proxy::query_result_local_digest(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr, tracing::trace_state_ptr trace_state, storage_proxy::clock_type::time_point timeout, uint64_t max_size) {
- return query_result_local(std::move(s), std::move(cmd), pr, query::result_request::only_digest, std::move(trace_state), timeout, max_size).then([] (foreign_ptr<lw_shared_ptr<query::result>> result, cache_temperature hit_rate) {
+ return query_result_local(std::move(s), std::move(cmd), pr, query::result_options::only_digest(query::digest_algorithm::MD5), std::move(trace_state), timeout, max_size).then([] (foreign_ptr<lw_shared_ptr<query::result>> result, cache_temperature hit_rate) {
return make_ready_future<query::result_digest, api::timestamp_type, cache_temperature>(*result->digest(), result->last_modified(), hit_rate);
});
}

future<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>
-storage_proxy::query_result_local(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr, query::result_request request,
+storage_proxy::query_result_local(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr, query::result_options opts,
tracing::trace_state_ptr trace_state, storage_proxy::clock_type::time_point timeout, uint64_t max_size) {
if (pr.is_singular()) {
unsigned shard = _db.local().shard_of(pr.start()->value().token());
- return _db.invoke_on(shard, [max_size, gs = global_schema_ptr(s), prv = dht::partition_range_vector({pr}) /* FIXME: pr is copied */, cmd, request, timeout, gt = tracing::global_trace_state_ptr(std::move(trace_state))] (database& db) mutable {
+ return _db.invoke_on(shard, [max_size, gs = global_schema_ptr(s), prv = dht::partition_range_vector({pr}) /* FIXME: pr is copied */, cmd, opts, timeout, gt = tracing::global_trace_state_ptr(std::move(trace_state))] (database& db) mutable {
tracing::trace(gt, "Start querying the token range that starts with {}", seastar::value_of([&prv] { return prv.begin()->start()->value().token(); }));
- return db.query(gs, *cmd, request, prv, gt, max_size, timeout).then([trace_state = gt.get()](auto&& f, cache_temperature ht) {
+ return db.query(gs, *cmd, opts, prv, gt, max_size, timeout).then([trace_state = gt.get()](auto&& f, cache_temperature ht) {
tracing::trace(trace_state, "Querying is done");
return make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>(make_foreign(std::move(f)), ht);
});
});
} else {
- return query_nonsingular_mutations_locally(s, cmd, {pr}, std::move(trace_state), max_size, timeout).then([s, cmd, request] (foreign_ptr<lw_shared_ptr<reconcilable_result>>&& r, cache_temperature&& ht) {
+ return query_nonsingular_mutations_locally(s, cmd, {pr}, std::move(trace_state), max_size, timeout).then([s, cmd, opts] (foreign_ptr<lw_shared_ptr<reconcilable_result>>&& r, cache_temperature&& ht) {
return make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>(
- ::make_foreign(::make_lw_shared(to_data_query_result(*r, s, cmd->slice, cmd->row_limit, cmd->partition_limit, request))), ht);
+ ::make_foreign(::make_lw_shared(to_data_query_result(*r, s, cmd->slice, cmd->row_limit, cmd->partition_limit, opts))), ht);
});
}
}
@@ -3811,18 +3812,11 @@ void storage_proxy::init_messaging_service() {
// this function assumes singular queries but doesn't validate
throw std::runtime_error("READ_DATA called with wrapping range");
}
- query::result_request qrr;
- switch (da) {
- case query::digest_algorithm::none:
- qrr = query::result_request::only_result;
- break;
- default:
- qrr = query::result_request::result_and_digest;
- break;
- }
-
+ query::result_options opts;
+ opts.digest_algo = da;
+ opts.request = da == query::digest_algorithm::none ? query::result_request::only_result : query::result_request::result_and_digest;
auto timeout = t ? *t : db::no_timeout;
- return p->query_result_local(std::move(s), cmd, std::move(pr2.first), qrr, trace_state_ptr, timeout, max_size);
+ return p->query_result_local(std::move(s), cmd, std::move(pr2.first), opts, trace_state_ptr, timeout, max_size);
}).finally([&trace_state_ptr, src_ip] () mutable {
tracing::trace(trace_state_ptr, "read_data handling is done, sending a response to /{}", src_ip);
});
diff --git a/tests/database_test.cc b/tests/database_test.cc
index d1c0086267..abb3a8cb7d 100644
--- a/tests/database_test.cc
+++ b/tests/database_test.cc
@@ -54,21 +54,21 @@ SEASTAR_TEST_CASE(test_querying_with_limits) {
auto max_size = std::numeric_limits<size_t>::max();
{
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), 3);
- auto result = db.query(s, cmd, query::result_request::only_result, pranges, nullptr, max_size).get0();
+ auto result = db.query(s, cmd, query::result_options::only_result(), pranges, nullptr, max_size).get0();
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(3);
}

{
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(),
query::max_rows, gc_clock::now(), std::experimental::nullopt, 5);
- auto result = db.query(s, cmd, query::result_request::only_result, pranges, nullptr, max_size).get0();
+ auto result = db.query(s, cmd, query::result_options::only_result(), pranges, nullptr, max_size).get0();
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(5);
}

{
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(),
query::max_rows, gc_clock::now(), std::experimental::nullopt, 3);
- auto result = db.query(s, cmd, query::result_request::only_result, pranges, nullptr, max_size).get0();
+ auto result = db.query(s, cmd, query::result_options::only_result(), pranges, nullptr, max_size).get0();
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(3);
}
});
diff --git a/tests/memory_footprint.cc b/tests/memory_footprint.cc
index 4f8756ba66..c7d9009453 100644
--- a/tests/memory_footprint.cc
+++ b/tests/memory_footprint.cc
@@ -184,7 +184,7 @@ static sizes calculate_sizes(const mutation& m) {
result.cache = tracker.region().occupancy().used_space() - cache_initial_occupancy;
result.frozen = freeze(m).representation().size();
result.canonical = canonical_mutation(m).representation().size();
- result.query_result = m.query(partition_slice_builder(*s).build(), query::result_request::only_result).buf().size();
+ result.query_result = m.query(partition_slice_builder(*s).build(), query::result_options::only_result()).buf().size();

tmpdir sstable_dir;
auto sst = sstables::make_sstable(s,
diff --git a/tests/mutation_test.cc b/tests/mutation_test.cc
index 906d82643e..fd3836f932 100644
--- a/tests/mutation_test.cc
+++ b/tests/mutation_test.cc
@@ -991,8 +991,8 @@ SEASTAR_TEST_CASE(test_query_digest) {
auto check_digests_equal = [] (const mutation& m1, const mutation& m2) {
auto ps1 = partition_slice_builder(*m1.schema()).build();
auto ps2 = partition_slice_builder(*m2.schema()).build();
- auto digest1 = *m1.query(ps1, query::result_request::only_digest).digest();
- auto digest2 = *m2.query(ps2, query::result_request::only_digest).digest();
+ auto digest1 = *m1.query(ps1, query::result_options::only_digest(query::digest_algorithm::MD5)).digest();
+ auto digest2 = *m2.query(ps2, query::result_options::only_digest(query::digest_algorithm::MD5)).digest();
if (digest1 != digest2) {
BOOST_FAIL(sprint("Digest should be the same for %s and %s", m1, m2));
}
@@ -1177,7 +1177,8 @@ SEASTAR_TEST_CASE(test_querying_expired_cells) {
.without_clustering_key_columns()
.without_partition_key_columns()
.build();
- return query::result_set::from_raw_result(s, slice, m.query(slice, query::result_request::result_and_digest, t));
+ auto opts = query::result_options{query::result_request::result_and_digest, query::digest_algorithm::MD5};
+ return query::result_set::from_raw_result(s, slice, m.query(slice, opts, t));
};

{
--
2.15.1

Duarte Nunes

<duarte@scylladb.com>
unread,
Jan 16, 2018, 9:36:08 AM1/16/18
to scylladb-dev@googlegroups.com
Replace the feed_hash() member function of partition_key and
clustering_key_prefix with the specialization of appending_hash,
so that we can use the general feed_hash() function.

Signed-off-by: Duarte Nunes <dua...@scylladb.com>
---
hashing_partition_visitor.hh | 2 +-
keys.hh | 47 +++++++++++++++++++++++++++++++++-----------
mutation.hh | 2 +-
position_in_partition.hh | 2 +-
repair/repair.cc | 4 ++--
5 files changed, 40 insertions(+), 17 deletions(-)

diff --git a/hashing_partition_visitor.hh b/hashing_partition_visitor.hh
index 83c5bb785a..b9877c80f4 100644
--- a/hashing_partition_visitor.hh
+++ b/hashing_partition_visitor.hh
@@ -67,7 +67,7 @@ class hashing_partition_visitor : public mutation_partition_visitor {
if (dummy) {
return;
}
- pos.key().feed_hash(_h, _s);
+ feed_hash(_h, pos.key(), _s);
feed_hash(_h, deleted_at);
feed_hash(_h, rm);
}
diff --git a/keys.hh b/keys.hh
index 7ff5d56c3f..81f112c7e1 100644
--- a/keys.hh
+++ b/keys.hh
@@ -146,13 +146,6 @@ class compound_view_wrapper {
auto components(const schema& s) const {
return components();
}
-
- template<typename Hasher>
- void feed_hash(Hasher& h, const schema& s) const {
- for (bytes_view v : components(s)) {
- ::feed_hash(h, v);
- }
- }
};

template <typename TopLevel, typename TopLevelView>
@@ -327,11 +320,6 @@ class compound_wrapper {
return *it;
}

- template<typename Hasher>
- void feed_hash(Hasher& h, const schema& s) const {
- view().feed_hash(h, s);
- }
-
// Returns the number of components of this compound.
size_t size(const schema& s) const {
return std::distance(begin(s), end(s));
@@ -777,3 +765,38 @@ class clustering_key_prefix : public prefix_compound_wrapper<clustering_key_pref
friend std::ostream& operator<<(std::ostream& out, const clustering_key_prefix& ckp);
};

+template<>
+struct appending_hash<partition_key_view> {
+ template<typename Hasher>
+ void operator()(Hasher& h, const partition_key& pk, const schema& s) const {
+ for (bytes_view v : pk.components(s)) {
+ ::feed_hash(h, v);
+ }
+ }
+};
+
+template<>
+struct appending_hash<partition_key> {
+ template<typename Hasher>
+ void operator()(Hasher& h, const partition_key& pk, const schema& s) const {
+ appending_hash<partition_key_view>()(h, pk.view(), s);
+ }
+};
+
+template<>
+struct appending_hash<clustering_key_prefix_view> {
+ template<typename Hasher>
+ void operator()(Hasher& h, const clustering_key_prefix_view& ck, const schema& s) const {
+ for (bytes_view v : ck.components(s)) {
+ ::feed_hash(h, v);
+ }
+ }
+};
+
+template<>
+struct appending_hash<clustering_key_prefix> {
+ template<typename Hasher>
+ void operator()(Hasher& h, const clustering_key_prefix& ck, const schema& s) const {
+ appending_hash<clustering_key_prefix_view>()(h, ck.view(), s);
+ }
+};
\ No newline at end of file
diff --git a/mutation.hh b/mutation.hh
index 9894dda204..52d2b2c30d 100644
--- a/mutation.hh
+++ b/mutation.hh
@@ -157,7 +157,7 @@ struct appending_hash<mutation> {
template<typename Hasher>
void operator()(Hasher& h, const mutation& m) const {
const schema& s = *m.schema();
- m.key().feed_hash(h, s);
+ feed_hash(h, m.key(), s);
m.partition().feed_hash(h, s);
}
};
diff --git a/position_in_partition.hh b/position_in_partition.hh
index 81a21290a7..e29790422a 100644
--- a/position_in_partition.hh
+++ b/position_in_partition.hh
@@ -267,7 +267,7 @@ class position_in_partition {
::feed_hash(hasher, _bound_weight);
if (_ck) {
::feed_hash(hasher, true);
- _ck->feed_hash(hasher, s);
+ ::feed_hash(hasher, *_ck, s);
} else {
::feed_hash(hasher, false);
}
diff --git a/repair/repair.cc b/repair/repair.cc
index ad0d0a3265..28d35d2d93 100644
--- a/repair/repair.cc
+++ b/repair/repair.cc
@@ -526,7 +526,7 @@ class partition_hasher {
: _schema(s), _cmp(s), _rt_list(s) { }

void consume_new_partition(const dht::decorated_key& dk) {
- dk.key().feed_hash(_hasher, _schema);
+ feed_hash(_hasher, dk.key(), _schema);
}

stop_iteration consume(tombstone t) {
@@ -545,7 +545,7 @@ class partition_hasher {
stop_iteration consume(const clustering_row& cr) {
consume_range_tombstones_until(cr);

- cr.key().feed_hash(_hasher, _schema);
+ feed_hash(_hasher, cr.key(), _schema);
feed_hash(_hasher, cr.tomb());
feed_hash(_hasher, cr.marker());
cr.cells().for_each_cell([&] (column_id id, const atomic_cell_or_collection& cell) {
--
2.15.1

Duarte Nunes

<duarte@scylladb.com>
unread,
Jan 16, 2018, 9:36:09 AM1/16/18
to scylladb-dev@googlegroups.com
Replace range_tombstone::feed_hash() with the specialization of
appending_hash, so that we can use the general feed_hash() function.

Signed-off-by: Duarte Nunes <dua...@scylladb.com>
---
hashing_partition_visitor.hh | 2 +-
range_tombstone.hh | 28 ++++++++++++++++------------
repair/repair.cc | 4 ++--
3 files changed, 19 insertions(+), 15 deletions(-)

diff --git a/hashing_partition_visitor.hh b/hashing_partition_visitor.hh
index b9877c80f4..f53515c696 100644
--- a/hashing_partition_visitor.hh
+++ b/hashing_partition_visitor.hh
@@ -60,7 +60,7 @@ class hashing_partition_visitor : public mutation_partition_visitor {
}

virtual void accept_row_tombstone(const range_tombstone& rt) {
- rt.feed_hash(_h, _s);
+ feed_hash(_h, rt, _s);
}

virtual void accept_row(position_in_partition_view pos, const row_tombstone& deleted_at, const row_marker& rm, is_dummy dummy, is_continuous continuous) override {
diff --git a/range_tombstone.hh b/range_tombstone.hh
index 146f96fdc0..585c147237 100644
--- a/range_tombstone.hh
+++ b/range_tombstone.hh
@@ -110,18 +110,6 @@ class range_tombstone final {
return _c(rt1.start_bound(), rt2.start_bound());
}
};
- template<typename Hasher>
- void feed_hash(Hasher& h, const schema& s) const {
- start.feed_hash(h, s);
- // For backward compatibility, don't consider new fields if
- // this could be an old-style, overlapping, range tombstone.
- if (!start.equal(s, end) || start_kind != bound_kind::incl_start || end_kind != bound_kind::incl_end) {
- ::feed_hash(h, start_kind);
- end.feed_hash(h, s);
- ::feed_hash(h, end_kind);
- }
- ::feed_hash(h, tomb);
- }
friend void swap(range_tombstone& rt1, range_tombstone& rt2) {
range_tombstone tmp(std::move(rt2), without_link());
rt2.move_assign(std::move(rt1));
@@ -193,6 +181,22 @@ class range_tombstone final {
}
};

+template<>
+struct appending_hash<range_tombstone> {
+ template<typename Hasher>
+ void operator()(Hasher& h, const range_tombstone& value, const schema& s) const {
+ feed_hash(h, value.start, s);
+ // For backward compatibility, don't consider new fields if
+ // this could be an old-style, overlapping, range tombstone.
+ if (!value.start.equal(s, value.end) || value.start_kind != bound_kind::incl_start || value.end_kind != bound_kind::incl_end) {
+ feed_hash(h, value.start_kind);
+ feed_hash(h, value.end, s);
+ feed_hash(h, value.end_kind);
+ }
+ feed_hash(h, value.tomb);
+ }
+};
+
// The accumulator expects the incoming range tombstones and clustered rows to
// follow the ordering used by the mutation readers.
//
diff --git a/repair/repair.cc b/repair/repair.cc
index 28d35d2d93..d51f7f1402 100644
--- a/repair/repair.cc
+++ b/repair/repair.cc
@@ -471,13 +471,13 @@ class partition_hasher {
}

void consume_range_tombstone_start(const range_tombstone& rt) {
- rt.start.feed_hash(_hasher, _schema);
+ feed_hash(_hasher, rt.start, _schema);
feed_hash(_hasher, rt.start_kind);
feed_hash(_hasher, rt.tomb);
}

void consume_range_tombstone_end(const range_tombstone& rt) {
- rt.end.feed_hash(_hasher, _schema);
+ feed_hash(_hasher, rt.end, _schema);
feed_hash(_hasher, rt.end_kind);
}

--
2.15.1

Duarte Nunes

<duarte@scylladb.com>
unread,
Jan 16, 2018, 9:36:11 AM1/16/18
to scylladb-dev@googlegroups.com
Use the digester class instead of md5_hasher to encapsulate the
decision of which hash algorithm to use.

Signed-off-by: Duarte Nunes <dua...@scylladb.com>
---
query-result-writer.hh | 15 ++++++++-------
query-result.hh | 4 ----
mutation_partition.cc | 20 ++++++++++----------
3 files changed, 18 insertions(+), 21 deletions(-)

diff --git a/query-result-writer.hh b/query-result-writer.hh
index afaed6de0e..9e9a11d792 100644
--- a/query-result-writer.hh
+++ b/query-result-writer.hh
@@ -26,7 +26,7 @@
#include "query-request.hh"
#include "query-result.hh"
#include "digest_algorithm.hh"
-
+#include "digester.hh"
#include "idl/uuid.dist.hh"
#include "idl/keys.dist.hh"
#include "idl/query.dist.hh"
@@ -48,8 +48,8 @@ class result::partition_writer {
const clustering_row_ranges& _ranges;
ser::query_result__partitions<bytes_ostream>& _pw;
ser::vector_position _pos;
- md5_hasher& _digest;
- md5_hasher _digest_pos;
+ digester& _digest;
+ digester _digest_pos;
uint32_t& _row_count;
uint32_t& _partition_count;
api::timestamp_type& _last_modified;
@@ -61,7 +61,7 @@ class result::partition_writer {
ser::query_result__partitions<bytes_ostream>& pw,
ser::vector_position pos,
ser::after_qr_partition__key<bytes_ostream> w,
- md5_hasher& digest,
+ digester& digest,
uint32_t& row_count,
uint32_t& partition_count,
api::timestamp_type& last_modified)
@@ -104,7 +104,7 @@ class result::partition_writer {
const partition_slice& slice() const {
return _slice;
}
- md5_hasher& digest() {
+ digester& digest() {
return _digest;
}
uint32_t& row_count() {
@@ -121,7 +121,6 @@ class result::partition_writer {

class result::builder {
bytes_ostream _out;
- md5_hasher _digest;
const partition_slice& _slice;
ser::query_result__partitions<bytes_ostream> _w;
result_request _request;
@@ -129,12 +128,14 @@ class result::builder {
uint32_t _partition_count = 0;
api::timestamp_type _last_modified = api::missing_timestamp;
short_read _short_read;
+ digester _digest;
result_memory_accounter _memory_accounter;
public:
builder(const partition_slice& slice, result_options options, result_memory_accounter memory_accounter)
: _slice(slice)
, _w(ser::writer_of_query_result<bytes_ostream>(_out).start_partitions())
, _request(options.request)
+ , _digest(digester(options.digest_algo))
, _memory_accounter(std::move(memory_accounter))
{ }
builder(builder&&) = delete; // _out is captured by reference
@@ -168,7 +169,7 @@ class result::builder {
}
}();
if (_request != result_request::only_result) {
- key.feed_hash(_digest, s);
+ _digest.feed_hash(key, s);
}
return partition_writer(_request, _slice, ranges, _w, std::move(pos), std::move(after_key), _digest, _row_count,
_partition_count, _last_modified);
diff --git a/query-result.hh b/query-result.hh
index 21270f4e35..fd5ce64d63 100644
--- a/query-result.hh
+++ b/query-result.hh
@@ -21,12 +21,9 @@

#pragma once

-#define CRYPTOPP_ENABLE_NAMESPACE_WEAK 1
-#include <cryptopp/md5.h>
#include "bytes_ostream.hh"
#include "digest_algorithm.hh"
#include "query-request.hh"
-#include "md5_hasher.hh"
#include <experimental/optional>
#include <seastar/util/bool_class.hh>
#include "seastarx.hh"
@@ -284,7 +281,6 @@ struct result_options {

class result_digest {
public:
- static_assert(16 == CryptoPP::Weak::MD5::DIGESTSIZE, "MD5 digest size is all wrong");
using type = std::array<uint8_t, 16>;
private:
type _digest;
diff --git a/mutation_partition.cc b/mutation_partition.cc
index 582dd6507c..8198f770e4 100644
--- a/mutation_partition.cc
+++ b/mutation_partition.cc
@@ -610,7 +610,7 @@ void write_counter_cell(RowWriter& w, const query::partition_slice& slice, ::ato
}

// returns the timestamp of a latest update to the row
-static api::timestamp_type hash_row_slice(md5_hasher& hasher,
+static api::timestamp_type hash_row_slice(query::digester& hasher,
const schema& s,
column_kind kind,
const row& cells,
@@ -622,14 +622,14 @@ static api::timestamp_type hash_row_slice(md5_hasher& hasher,
if (!cell) {
continue;
}
- feed_hash(hasher, id);
+ hasher.feed_hash(id);
auto&& def = s.column_at(kind, id);
if (def.is_atomic()) {
- feed_hash(hasher, cell->as_atomic_cell(), def);
+ hasher.feed_hash(cell->as_atomic_cell(), def);
max = std::max(max, cell->as_atomic_cell().timestamp());
} else {
auto&& cm = cell->as_collection_mutation();
- feed_hash(hasher, cm, def);
+ hasher.feed_hash(cm, def);
auto&& ctype = static_pointer_cast<const collection_type_impl>(def.type);
max = std::max(max, ctype->last_update(cm));
}
@@ -714,7 +714,7 @@ mutation_partition::query_compacted(query::result::partition_writer& pw, const s
}
if (pw.requested_digest()) {
auto pt = partition_tombstone();
- ::feed_hash(pw.digest(), pt);
+ pw.digest().feed_hash(pt);
auto t = hash_row_slice(pw.digest(), s, column_kind::static_column, static_row(), slice.static_columns);
pw.last_modified() = std::max({pw.last_modified(), pt.timestamp, t});
}
@@ -736,8 +736,8 @@ mutation_partition::query_compacted(query::result::partition_writer& pw, const s
auto row_tombstone = tombstone_for_row(s, e);

if (pw.requested_digest()) {
- e.key().feed_hash(pw.digest(), s);
- ::feed_hash(pw.digest(), row_tombstone);
+ pw.digest().feed_hash(e.key(), s);
+ pw.digest().feed_hash(row_tombstone);
auto t = hash_row_slice(pw.digest(), s, column_kind::regular_column, row.cells(), slice.regular_columns);
pw.last_modified() = std::max({pw.last_modified(), row_tombstone.tomb().timestamp, t});
}
@@ -1669,7 +1669,7 @@ void mutation_querier::query_static_row(const row& r, tombstone current_tombston
_memory_accounter.update(stream.size());
}
if (_pw.requested_digest()) {
- ::feed_hash(_pw.digest(), current_tombstone);
+ _pw.digest().feed_hash(current_tombstone);
auto t = hash_row_slice(_pw.digest(), _schema, column_kind::static_column,
r, slice.static_columns);
_pw.last_modified() = std::max({_pw.last_modified(), current_tombstone.timestamp, t});
@@ -1698,8 +1698,8 @@ stop_iteration mutation_querier::consume(clustering_row&& cr, row_tombstone curr
const query::partition_slice& slice = _pw.slice();

if (_pw.requested_digest()) {
- cr.key().feed_hash(_pw.digest(), _schema);
- ::feed_hash(_pw.digest(), current_tombstone);
+ _pw.digest().feed_hash(cr.key(), _schema);
+ _pw.digest().feed_hash(current_tombstone);
auto t = hash_row_slice(_pw.digest(), _schema, column_kind::regular_column, cr.cells(), slice.regular_columns);
_pw.last_modified() = std::max({_pw.last_modified(), current_tombstone.tomb().timestamp, t});
}
--
2.15.1

Duarte Nunes

<duarte@scylladb.com>
unread,
Jan 16, 2018, 9:36:12 AM1/16/18
to scylladb-dev@googlegroups.com
Replace the atomic_cell_or_collection::feed_hash() member function
with the specialization of appending_hash, and use that instead.

Signed-off-by: Duarte Nunes <dua...@scylladb.com>
---
atomic_cell_hash.hh | 13 +++++++++++++
atomic_cell_or_collection.hh | 8 --------
repair/repair.cc | 3 ++-
3 files changed, 15 insertions(+), 9 deletions(-)

diff --git a/atomic_cell_hash.hh b/atomic_cell_hash.hh
index 9037f55950..d04810da56 100644
--- a/atomic_cell_hash.hh
+++ b/atomic_cell_hash.hh
@@ -25,6 +25,7 @@

#include "types.hh"
#include "atomic_cell.hh"
+#include "atomic_cell_or_collection.hh"
#include "hashing.hh"
#include "counters.hh"

@@ -78,3 +79,15 @@ struct appending_hash<collection_mutation> {
feed_hash(h, static_cast<collection_mutation_view>(cm), cdef);
}
};
+
+template<>
+struct appending_hash<atomic_cell_or_collection> {
+ template<typename Hasher>
+ void operator()(Hasher& h, const atomic_cell_or_collection& c, const column_definition& cdef) const {
+ if (cdef.is_atomic()) {
+ feed_hash(h, c.as_atomic_cell(), cdef);
+ } else {
+ feed_hash(h, c.as_collection_mutation(), cdef);
+ }
+ }
+};
\ No newline at end of file
diff --git a/atomic_cell_or_collection.hh b/atomic_cell_or_collection.hh
index 4a3ea36588..81565d9d05 100644
--- a/atomic_cell_or_collection.hh
+++ b/atomic_cell_or_collection.hh
@@ -59,14 +59,6 @@ class atomic_cell_or_collection final {
bool operator==(const atomic_cell_or_collection& other) const {
return _data == other._data;
}
- template<typename Hasher>
- void feed_hash(Hasher& h, const column_definition& def) const {
- if (def.is_atomic()) {
- ::feed_hash(h, as_atomic_cell(), def);
- } else {
- ::feed_hash(h, as_collection_mutation(), def);
- }
- }
size_t external_memory_usage() const {
return _data.external_memory_usage();
}
diff --git a/repair/repair.cc b/repair/repair.cc
index d51f7f1402..3ff9b394d0 100644
--- a/repair/repair.cc
+++ b/repair/repair.cc
@@ -22,6 +22,7 @@
#include "repair.hh"
#include "range_split.hh"

+#include "atomic_cell_hash.hh"
#include "streaming/stream_plan.hh"
#include "streaming/stream_state.hh"
#include "gms/inet_address.hh"
@@ -467,7 +468,7 @@ class partition_hasher {
void consume_cell(const column_definition& col, const atomic_cell_or_collection& cell) {
feed_hash(_hasher, col.name());
feed_hash(_hasher, col.type->name());
- cell.feed_hash(_hasher, col);
+ feed_hash(_hasher, cell, col);
}

void consume_range_tombstone_start(const range_tombstone& rt) {
--
2.15.1

Duarte Nunes

<duarte@scylladb.com>
unread,
Jan 16, 2018, 9:36:13 AM1/16/18
to scylladb-dev@googlegroups.com
We add storage to a row to hold the cached hashes of each individual
cell. We don't store the hash in each cell because that would a)
change the cell equality function, and b) require us to change a cell
in a potentially fragmented buffer.

Signed-off-by: Duarte Nunes <dua...@scylladb.com>
---
mutation_partition.hh | 85 ++++++++++++++++++++++++++++++++++++++++++---------
mutation_partition.cc | 69 ++++++++++++++++++++++++++++++-----------
2 files changed, 121 insertions(+), 33 deletions(-)

diff --git a/mutation_partition.hh b/mutation_partition.hh
index 79bfef20ea..2155b1a1f7 100644
--- a/mutation_partition.hh
+++ b/mutation_partition.hh
@@ -23,6 +23,8 @@

#include <iosfwd>
#include <map>
+#include <optional>
+#include <boost/container/small_vector.hpp>
#include <boost/intrusive/set.hpp>
#include <boost/range/iterator_range.hpp>
#include <boost/range/adaptor/indexed.hpp>
@@ -58,10 +60,13 @@ class clustering_row;
// Can be used as a range of row::cell_entry.
//
class row {
+ using hash_type = uint64_t;
+
class cell_entry {
boost::intrusive::set_member_hook<> _link;
column_id _id;
atomic_cell_or_collection _cell;
+ mutable std::optional<hash_type> _hash;
friend class row;
public:
cell_entry(column_id id, atomic_cell_or_collection cell)
@@ -77,6 +82,7 @@ class row {
column_id id() const { return _id; }
const atomic_cell_or_collection& cell() const { return _cell; }
atomic_cell_or_collection& cell() { return _cell; }
+ std::optional<hash_type> hash() const { return _hash; }

struct compare {
bool operator()(const cell_entry& e1, const cell_entry& e2) const {
@@ -112,6 +118,8 @@ class row {
struct vector_storage {
std::bitset<max_vector_size> present;
vector_type v;
+ mutable std::bitset<max_vector_size> hash_present;
+ mutable std::vector<hash_type> hashes;

vector_storage() = default;
vector_storage(const vector_storage&) = default;
@@ -120,6 +128,23 @@ class row {
, v(std::move(other.v)) {
other.present = {};
}
+
+ std::optional<hash_type> hash(column_id id) const {
+ if (hash_present.test(id)) {
+ return std::optional(hashes[id]);
+ }
+ return std::nullopt;
+ }
+
+ void add_hash(column_id id, hash_type hash) const {
+ if (id >= hashes.size()) {
+ hashes.resize(id);
+ hashes.emplace_back(hash);
+ } else {
+ hashes[id] = hash;
+ }
+ hash_present.set(id);
+ }
};

union storage {
@@ -128,6 +153,7 @@ class row {
map_type set;
vector_storage vector;
} _storage;
+
public:
row();
~row();
@@ -137,7 +163,7 @@ class row {
size_t size() const { return _size; }
bool empty() const { return _size == 0; }

- void reserve(column_id);
+ void reserve(column_id, size_t cached_hashes = 0);

const atomic_cell_or_collection& cell_at(column_id id) const;

@@ -155,6 +181,7 @@ class row {
if (func(i, c)) {
c = atomic_cell_or_collection();
_storage.vector.present.reset(i);
+ _storage.vector.hash_present.reset(i);
_size--;
}
}
@@ -195,43 +222,66 @@ class row {

template<typename Func>
void consume_with(Func&&);
+
+ template<typename Func, typename VectorStorage>
+ static constexpr auto maybe_invoke_with_hash(Func& func, VectorStorage& s, column_id id) {
+ if constexpr (std::is_invocable_v<Func, column_id, const atomic_cell_or_collection&, std::optional<hash_type>>) {
+ return func(id, s.v[id], s.hash(id));
+ } else {
+ return func(id, s.v[id]);
+ }
+ }
+
+ template<typename Func, typename CellEntry>
+ static constexpr auto maybe_invoke_with_hash(Func& func, CellEntry& cell) {
+ if constexpr (std::is_invocable_v<Func, column_id, const atomic_cell_or_collection&, std::optional<hash_type>>) {
+ return func(cell.id(), cell.cell(), cell.hash());
+ } else {
+ return func(cell.id(), cell.cell());
+ }
+ }
public:
- // Calls Func(column_id, atomic_cell_or_collection&) for each cell in this row.
+ // Calls Func(column_id, atomic_cell_or_collection&) for each cell in this row,
+ // or Func(column_id, atomic_cell_or_collection&, std::optional<hash_type>) depending
+ // on the concrete Func type.
// noexcept if Func doesn't throw.
template<typename Func>
void for_each_cell(Func&& func) {
if (_type == storage_type::vector) {
for (auto i : bitsets::for_each_set(_storage.vector.present)) {
- func(i, _storage.vector.v[i]);
+ maybe_invoke_with_hash(func, _storage.vector, i);
}
} else {
for (auto& cell : _storage.set) {
- func(cell.id(), cell.cell());
+ maybe_invoke_with_hash(func, cell);
}
}
}

template<typename Func>
void for_each_cell(Func&& func) const {
- for_each_cell_until([func = std::forward<Func>(func)] (column_id id, const atomic_cell_or_collection& c) {
- func(id, c);
- return stop_iteration::no;
- });
+ if (_type == storage_type::vector) {
+ for (auto i : bitsets::for_each_set(_storage.vector.present)) {
+ maybe_invoke_with_hash(func, _storage.vector, i);
+ }
+ } else {
+ for (auto& cell : _storage.set) {
+ maybe_invoke_with_hash(func, cell);
+ }
+ }
}

template<typename Func>
void for_each_cell_until(Func&& func) const {
if (_type == storage_type::vector) {
for (auto i : bitsets::for_each_set(_storage.vector.present)) {
- auto& cell = _storage.vector.v[i];
- if (func(i, cell) == stop_iteration::yes) {
+ if (maybe_invoke_with_hash(func, _storage.vector, i) == stop_iteration::yes) {
break;
}
}
} else {
for (auto& cell : _storage.set) {
- const auto& c = cell.cell();
- if (func(cell.id(), c) == stop_iteration::yes) {
+ if (maybe_invoke_with_hash(func, cell) == stop_iteration::yes) {
break;
}
}
@@ -240,15 +290,16 @@ class row {

// Merges cell's value into the row.
// Weak exception guarantees.
- void apply(const column_definition& column, const atomic_cell_or_collection& cell);
+ void apply(const column_definition& column, const atomic_cell_or_collection& cell, std::optional<hash_type> hash = std::nullopt);

// Merges cell's value into the row.
// Weak exception guarantees.
- void apply(const column_definition& column, atomic_cell_or_collection&& cell);
+ void apply(const column_definition& column, atomic_cell_or_collection&& cell, std::optional<hash_type> hash = std::nullopt);

// Monotonic exception guarantees. In case of exception the sum of cell and this remains the same as before the exception.
- void apply_monotonically(const column_definition& column, atomic_cell_or_collection&& cell);
+ void apply_monotonically(const column_definition& column, atomic_cell_or_collection&& cell, std::optional<hash_type> hash);

+ std::optional<hash_type> apply_monotonically(const column_definition& def, atomic_cell_or_collection& dst, atomic_cell_or_collection& src, std::optional<hash_type> hash);

// Adds cell to the row. The column must not be already set.
void append_cell(column_id id, atomic_cell_or_collection cell);
@@ -272,6 +323,10 @@ class row {

size_t external_memory_usage() const;

+ std::optional<hash_type> cell_hash(column_id id) const;
+
+ void prepare_hash(const schema& s, column_kind kind) const;
+
friend std::ostream& operator<<(std::ostream& os, const row& r);
};

diff --git a/mutation_partition.cc b/mutation_partition.cc
index 8198f770e4..f808c55d5a 100644
--- a/mutation_partition.cc
+++ b/mutation_partition.cc
@@ -637,6 +637,30 @@ static api::timestamp_type hash_row_slice(query::digester& hasher,
return max;
}

+std::optional<row::hash_type> row::cell_hash(column_id id) const {
+ if (_type == storage_type::vector) {
+ return id < max_vector_size ? _storage.vector.hash(id) : std::nullopt;
+ }
+ auto it = _storage.set.find(id, cell_entry::compare());
+ if (it != _storage.set.end()) {
+ return it->hash();
+ }
+ return std::nullopt;
+}
+
+void row::prepare_hash(const schema& s, column_kind kind) const {
+ // const to avoid removing const qualifiers on the read path
+ for_each_cell([&] (column_id id, const atomic_cell_or_collection& c) {
+ xx_hasher cellh;
+ feed_hash(cellh, c, s.column_at(kind, id));
+ if (_type == storage_type::vector) {
+ _storage.vector.add_hash(id, cellh.finalize_uint64());
+ } else {
+ _storage.set.find(id, cell_entry::compare())->_hash = cellh.finalize_uint64();
+ }
+ });
+}
+
template<typename RowWriter>
static void get_compacted_row_slice(const schema& s,
const query::partition_slice& slice,
@@ -952,8 +976,8 @@ mutation_partition mutation_partition::sliced(const schema& s, const query::clus
return p;
}

-void
-apply_monotonically(const column_definition& def, atomic_cell_or_collection& dst, atomic_cell_or_collection& src) {
+std::optional<row::hash_type>
+row::apply_monotonically(const column_definition& def, atomic_cell_or_collection& dst, atomic_cell_or_collection& src, std::optional<hash_type> hash) {
// Must be run via with_linearized_managed_bytes() context, but assume it is
// provided via an upper layer
if (def.is_atomic()) {
@@ -961,22 +985,24 @@ apply_monotonically(const column_definition& def, atomic_cell_or_collection& dst
counter_cell_view::apply_reversibly(dst, src); // FIXME: Optimize
} else if (compare_atomic_cell_for_merge(dst.as_atomic_cell(), src.as_atomic_cell()) < 0) {
std::swap(dst, src);
+ return std::move(hash);
}
} else {
auto ct = static_pointer_cast<const collection_type_impl>(def.type);
dst = ct->merge(dst.as_collection_mutation(), src.as_collection_mutation());
}
+ return std::nullopt;
}

void
-row::apply(const column_definition& column, const atomic_cell_or_collection& value) {
+row::apply(const column_definition& column, const atomic_cell_or_collection& value, std::optional<hash_type> hash) {
auto tmp = value;
- apply_monotonically(column, std::move(tmp));
+ apply_monotonically(column, std::move(tmp), std::move(hash));
}

void
-row::apply(const column_definition& column, atomic_cell_or_collection&& value) {
- apply_monotonically(column, std::move(value));
+row::apply(const column_definition& column, atomic_cell_or_collection&& value, std::optional<hash_type> hash) {
+ apply_monotonically(column, std::move(value), std::move(hash));
}

template<typename Func>
@@ -985,8 +1011,9 @@ void row::consume_with(Func&& func) {
unsigned i = 0;
for (; i < _storage.vector.v.size(); i++) {
if (_storage.vector.present.test(i)) {
- func(i, _storage.vector.v[i]);
+ func(i, _storage.vector.v[i], _storage.vector.hash(i));
_storage.vector.present.reset(i);
+ _storage.vector.hash_present.reset(i);
--_size;
}
}
@@ -994,7 +1021,7 @@ void row::consume_with(Func&& func) {
auto del = current_deleter<cell_entry>();
auto i = _storage.set.begin();
while (i != _storage.set.end()) {
- func(i->id(), i->cell());
+ func(i->id(), i->cell(), i->hash());
i = _storage.set.erase_and_dispose(i, del);
--_size;
}
@@ -1002,7 +1029,7 @@ void row::consume_with(Func&& func) {
}

void
-row::apply_monotonically(const column_definition& column, atomic_cell_or_collection&& value) {
+row::apply_monotonically(const column_definition& column, atomic_cell_or_collection&& value, std::optional<uint64_t> hash) {
static_assert(std::is_nothrow_move_constructible<atomic_cell_or_collection>::value
&& std::is_nothrow_move_assignable<atomic_cell_or_collection>::value,
"noexcept required for atomicity");
@@ -1020,7 +1047,10 @@ row::apply_monotonically(const column_definition& column, atomic_cell_or_collect
_storage.vector.present.set(id);
_size++;
} else {
- ::apply_monotonically(column, _storage.vector.v[id], value);
+ hash = apply_monotonically(column, _storage.vector.v[id], value, std::move(hash));
+ }
+ if (hash) {
+ _storage.vector.add_hash(id, *hash);
}
} else {
if (_type == storage_type::vector) {
@@ -1032,8 +1062,9 @@ row::apply_monotonically(const column_definition& column, atomic_cell_or_collect
_storage.set.insert(i, *e);
_size++;
e->_cell = std::move(value);
+ e->_hash = std::move(hash);
} else {
- ::apply_monotonically(column, i->cell(), value);
+ i->_hash = apply_monotonically(column, i->cell(), value, std::move(hash));
}
}
}
@@ -1347,6 +1378,7 @@ void row::vector_to_set()
for (auto i : bitsets::for_each_set(_storage.vector.present)) {
auto& c = _storage.vector.v[i];
auto e = current_allocator().construct<cell_entry>(i, std::move(c));
+ e->_hash = _storage.vector.hash(i);
set.insert(set.end(), *e);
}
} catch (...) {
@@ -1361,13 +1393,14 @@ void row::vector_to_set()
_type = storage_type::set;
}

-void row::reserve(column_id last_column)
+void row::reserve(column_id last_column, size_t cached_hashes)
{
if (_type == storage_type::vector && last_column >= internal_count) {
if (last_column >= max_vector_size) {
vector_to_set();
} else {
_storage.vector.v.reserve(last_column);
+ _storage.vector.hashes.reserve(cached_hashes);
}
}
}
@@ -1432,12 +1465,12 @@ void row::apply(const schema& s, column_kind kind, const row& other) {
return;
}
if (other._type == storage_type::vector) {
- reserve(other._storage.vector.v.size() - 1);
+ reserve(other._storage.vector.v.size() - 1, other._storage.vector.hashes.size());
} else {
reserve(other._storage.set.rbegin()->id());
}
- other.for_each_cell([&] (column_id id, const atomic_cell_or_collection& cell) {
- apply(s.column_at(kind, id), cell);
+ other.for_each_cell([&] (column_id id, const atomic_cell_or_collection& cell, std::optional<hash_type> hash) {
+ apply(s.column_at(kind, id), cell, std::move(hash));
});
}

@@ -1450,12 +1483,12 @@ void row::apply_monotonically(const schema& s, column_kind kind, row&& other) {
return;
}
if (other._type == storage_type::vector) {
- reserve(other._storage.vector.v.size() - 1);
+ reserve(other._storage.vector.v.size() - 1, other._storage.vector.hashes.size());
} else {
reserve(other._storage.set.rbegin()->id());
}
- other.consume_with([&] (column_id id, atomic_cell_or_collection& cell) {
- apply_monotonically(s.column_at(kind, id), std::move(cell));
+ other.consume_with([&] (column_id id, atomic_cell_or_collection& cell, std::optional<hash_type> hash) {
+ apply_monotonically(s.column_at(kind, id), std::move(cell), std::move(hash));
});
}

--
2.15.1

It is loading more messages.
0 new messages