// The story is at
//
https://groups.google.com/forum/#!msg/scylladb-dev/sxqTHM9rSDQ/WqwF1AQDAQAJ
This is the B+ version which satisfies seveal specific requirements
to be suitable for row-cache usage.
1. Insert/Remove doesn't invalidate iterators
2. Elements should be LSA-compactable
3. Low overhead of data nodes (1 pointer)
4. Exteral less-only comparator
5. As little actions on insert/delete as possible
6. Iterator walks the sorted keys
The design, briefly is:
There are 3 types of nodes, inner, leaf and data, inner and leaf
keep build-in array of N keys and N(+1) nodes. Leaf nodes sit in
a doubly linked list. Data nodes live separately from the leaf ones
and keep pointers on them. Tree handler keeps pointers on root and
left-most and right-most leaves. Nodes do _not_ keep pointers or
references on the tree (except 3 of them, see below).
Update in v6:
- Insertion tries to push kids to siblings before split
Before this change insertion into full node resulted into this
node being split into two equal parts. This behaviour for random
keys stress gives a tree with ~2/3 of nodes half-filled.
With this change before splitting the full node try to push one
element to each of the siblings (if they exist and not full).
This slows the insertion a bit (but it's still way faster than
the std::set), but gives 15% less total number of nodes.
- Iterator method to reconstruct the data at the given position
The helper creates a new data node, emplaces data into it and
replaces the iterator's one with it. Needed to keep arrays of
data in tree.
- Milli-optimize erase()
- Return back an iterator that will likely be not re-validated
- Do not try to update ancestors separation key for leftmost kid
This caused the clear()-like workload work poorly as compared to
std:set. In particular the row_cache::invalidate() method does
exactly this and this change improves its timing.
- Perf test to measure drain speed
- Helper call to collect tree counters
Update in v5:
- Fix corner case of iterator.emplace_before()
- Clean heterogenous lookup API
- Handle exceptions from nodes allocations
- Explicitly mark places where the key is copied (for future)
- Extend the tree.lower_bound() API to report back whether
the bound hit the key or not
- Addressed style/cleanness review comments
configure.py | 10 +
test/unit/bptree_key.hh | 101 ++
test/unit/bptree_validation.hh | 318 +++++
utils/bptree.hh | 1862 +++++++++++++++++++++++++++
test/boost/bptree_test.cc | 344 +++++
test/perf/perf_bptree.cc | 165 +++
test/perf/perf_bptree_drain.cc | 154 +++
test/unit/bptree_compaction_test.cc | 210 +++
test/unit/bptree_stress_test.cc | 236 ++++
9 files changed, 3400 insertions(+)
create mode 100644 test/unit/bptree_key.hh
create mode 100644 test/unit/bptree_validation.hh
create mode 100644 utils/bptree.hh
create mode 100644 test/boost/bptree_test.cc
create mode 100644 test/perf/perf_bptree.cc
create mode 100644 test/perf/perf_bptree_drain.cc
create mode 100644 test/unit/bptree_compaction_test.cc
create mode 100644 test/unit/bptree_stress_test.cc
diff --git a/configure.py b/configure.py
index 794c19541..c0e622d9b 100755
--- a/configure.py
+++ b/configure.py
@@ -381,6 +381,7 @@ scylla_tests = set([
'test/boost/view_schema_ckey_test',
'test/boost/vint_serialization_test',
'test/boost/virtual_reader_test',
+ 'test/boost/bptree_test',
'test/manual/ec2_snitch_test',
'test/manual/gce_snitch_test',
'test/manual/gossip',
@@ -398,6 +399,8 @@ scylla_tests = set([
'test/perf/perf_fast_forward',
'test/perf/perf_hash',
'test/perf/perf_mutation',
+ 'test/perf/perf_bptree',
+ 'test/perf/perf_bptree_drain',
'test/perf/perf_row_cache_update',
'test/perf/perf_simple_query',
'test/perf/perf_sstable',
@@ -405,6 +408,8 @@ scylla_tests = set([
'test/unit/lsa_sync_eviction_test',
'test/unit/row_cache_alloc_stress_test',
'test/unit/row_cache_stress_test',
+ 'test/unit/bptree_stress_test',
+ 'test/unit/bptree_compaction_test',
])
perf_tests = set([
@@ -939,6 +944,7 @@ pure_boost_tests = set([
'test/boost/small_vector_test',
'test/boost/top_k_test',
'test/boost/vint_serialization_test',
+ 'test/boost/bptree_test',
'test/manual/json_test',
'test/manual/streaming_histogram_test',
])
@@ -952,10 +958,14 @@ tests_not_using_seastar_test_framework = set([
'test/perf/perf_cql_parser',
'test/perf/perf_hash',
'test/perf/perf_mutation',
+ 'test/perf/perf_bptree',
+ 'test/perf/perf_bptree_drain',
'test/perf/perf_row_cache_update',
'test/unit/lsa_async_eviction_test',
'test/unit/lsa_sync_eviction_test',
'test/unit/row_cache_alloc_stress_test',
+ 'test/unit/bptree_stress_test',
+ 'test/unit/bptree_compaction_test',
'test/manual/sstable_scan_footprint_test',
]) | pure_boost_tests
diff --git a/test/unit/bptree_key.hh b/test/unit/bptree_key.hh
new file mode 100644
index 000000000..54347a54f
--- /dev/null
+++ b/test/unit/bptree_key.hh
@@ -0,0 +1,101 @@
+/*
+ * Copyright (C) 2020 ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <
http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+/*
+ * Helper class that helps to check that tree
+ * - works with keys without default contstuctor
+ * - moves the keys around properly
+ */
+class test_key {
+ int _val;
+ int* _cookie;
+ int* _p_cookie;
+
+public:
+ bool is_alive() const {
+ if (_val == -1) {
+ fmt::print("key value is reset\n");
+ return false;
+ }
+
+ if (_cookie == nullptr) {
+ fmt::print("key cookie is reset\n");
+ return false;
+ }
+
+ if (*_cookie != 0) {
+ fmt::print("key cookie value is corrupted {}\n", *_cookie);
+ return false;
+ }
+
+ return true;
+ }
+
+ bool less(const test_key& o) const {
+ return _val < o._val;
+ }
+
+ explicit test_key(int nr) : _val(nr) {
+ _cookie = new int(0);
+ _p_cookie = new int(1);
+ }
+
+ operator int() const { return _val; }
+
+ test_key& operator=(const test_key& other) = delete;
+ test_key& operator=(test_key&& other) = delete;
+
+private:
+ /*
+ * Keep this private to make bptree.hh explicitly call the
+ * copy_key in the places where the key is copied
+ */
+ test_key(const test_key& other) noexcept : _val(other._val) {
+ _cookie = new int(*other._cookie);
+ _p_cookie = new int(*other._p_cookie);
+ }
+
+ friend test_key copy_key(const test_key&);
+
+public:
+ test_key(test_key&& other) noexcept : _val(other._val) {
+ other._val = -1;
+ _cookie = other._cookie;
+ other._cookie = nullptr;
+ _p_cookie = new int(*other._p_cookie);
+ }
+
+ ~test_key() {
+ if (_cookie != nullptr) {
+ delete _cookie;
+ }
+ assert(_p_cookie != nullptr);
+ delete _p_cookie;
+ }
+};
+
+test_key copy_key(const test_key& other) { return test_key(other); }
+
+struct test_key_compare {
+ bool operator()(const test_key& a, const test_key& b) const { return a.less(b); }
+};
diff --git a/test/unit/bptree_validation.hh b/test/unit/bptree_validation.hh
new file mode 100644
index 000000000..cdf137bda
--- /dev/null
+++ b/test/unit/bptree_validation.hh
@@ -0,0 +1,318 @@
+/*
+ * Copyright (C) 2020 ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <
http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+namespace bplus {
+
+template <typename K, typename T, typename Less, int NodeSize>
+class validator {
+ using tree = class tree<K, T, Less, NodeSize, key_search::both, with_debug::yes>;
+ using node = typename tree::node;
+
+ void validate_node(const tree& t, const node& n, int& prev, int& min, bool is_root);
+ void validate_list(const tree& t);
+
+public:
+ void print_tree(const tree& t, char pfx) const {
+ fmt::print("/ {} <- | {} | -> {}\n", t._left->id(), t._root->id(), t._right->id());
+ print_node(*t._root, pfx, 2);
+ fmt::print("\\\n");
+ }
+
+ void print_node(const node& n, char pfx, int indent) const {
+ int i;
+
+ fmt::print("{:<{}c}{:s} {:d} ({:d} keys, {:x} flags):", pfx, indent,
+ n.is_leaf() ? "leaf" : "node",
n.id(), n._num_keys, n._flags);
+ if (n.is_leaf()) {
+ for (i = 0; i < n._num_keys; i++) {
+ fmt::print(" {}", (int)n._keys[i].v);
+ }
+ fmt::print("\n");
+
+ return;
+ }
+ fmt::print("\n");
+
+ if (n._kids[0].n != nullptr) {
+ print_node(*n._kids[0].n, pfx, indent + 2);
+ }
+ for (i = 0; i < n._num_keys; i++) {
+ fmt::print("{:<{}c}---{}---\n", pfx, indent, (int)n._keys[i].v);
+ print_node(*n._kids[i + 1].n, pfx, indent + 2);
+ }
+ }
+
+ void validate(const tree& t);
+};
+
+
+template <typename K, typename T, typename L, int NS>
+void validator<K, T, L, NS>::validate_node(const tree& t, const node& n, int& prev_key, int& min_key, bool is_root) {
+ int i;
+
+ if (n.is_root() != is_root) {
+ fmt::print("node {} needs to {} root, but {}\n",
n.id(), is_root ? "be" : "be not", n._flags);
+ throw "root broken";
+ }
+
+ for (i = 0; i < n._num_keys; i++) {
+ if (!n._keys[i].v.is_alive()) {
+ fmt::print("node {} key {} is not alive\n",
n.id(), i);
+ throw "key dead";
+ }
+ }
+
+ if (n.is_leaf()) {
+ for (i = 0; i < n._num_keys; i++) {
+ if (t._less(n._keys[i].v, K(prev_key))) {
+ fmt::print("node misordered @{} (prev {})\n", (int)n._keys[i].v, prev_key);
+ throw "misorder";
+ }
+ if (n._kids[i + 1].d->_leaf != &n) {
+ fmt::print("data mispoint\n");
+ throw "data backlink";
+ }
+
+ prev_key = n._keys[i].v;
+ if (!n._kids[i + 1].d->value.match_key(n._keys[i].v)) {
+ fmt::print("node value corrupted @{:d}.{:d}\n",
n.id(), i);
+ throw "data corruption";
+ }
+ }
+
+ if (n._num_keys > 0) {
+ min_key = (int)n._keys[0].v;
+ }
+ } else if (n._num_keys > 0) {
+ node* k = n._kids[0].n;
+
+ if (k->_parent != &n) {
+ fmt::print("node {:d} -parent-> {:d}, expect {:d}\n", k->id(), k->_parent->id(),
n.id());
+ throw "mis-parented node";
+ }
+ validate_node(t, *k, prev_key, min_key, false);
+ for (i = 0; i < n._num_keys; i++) {
+ k = n._kids[i + 1].n;
+ if (k->_parent != &n) {
+ fmt::print("node {:d} -parent-> {:d}, expect {:d}\n",
+ k->id(), k->_parent ? k->_parent->id() : -1,
n.id());
+ throw "mis-parented node";
+ }
+ if (t._less(k->_keys[0].v, n._keys[i].v)) {
+ fmt::print("node {:d}.{:d}, separation key {}, kid has {}\n",
n.id(), k->id(),
+ (int)n._keys[i].v, (int)k->_keys[0].v);
+ throw "separation key mismatch";
+ }
+
+ int min = 0;
+ validate_node(t, *k, prev_key, min, false);
+ if (t._less(n._keys[i].v, K(min)) || t._less(K(min), n._keys[i].v)) {
+ fmt::print("node {:d}.[{:d}]{:d}, separation key {}, min {}\n",
+
n.id(), i, k->id(), (int)n._keys[i].v, min);
+ if (strict_separation_key || t._less(K(min), n._keys[i].v)) {
+ throw "separation key screw";
+ }
+ }
+ }
+ }
+}
+
+template <typename K, typename T, typename L, int NS>
+void validator<K, T, L, NS>::validate_list(const tree& t) {
+ int prev = 0;
+
+ node* lh = t.left_leaf_slow();
+ node* rh = t.right_leaf_slow();
+
+ if (lh != t._left) {
+ fmt::print("left {:d}, slow {:d}\n", t._left->id(), lh->id());
+ throw "list broken";
+ }
+
+ if (!(lh->_flags & node::NODE_LEFTMOST)) {
+ fmt::print("left {:d} is not marked as such {}\n", t._left->id(), t._left->_flags);;
+ throw "list broken";
+ }
+
+ if (rh != t._right) {
+ fmt::print("right {:d}, slow {:d}\n", t._right->id(), rh->id());
+ throw "list broken";
+ }
+
+ if (!(rh->_flags & node::NODE_RIGHTMOST)) {
+ fmt::print("right {:d} is not marked as such {}\n", t._right->id(), t._right->_flags);;
+ throw "list broken";
+ }
+
+ node* r = lh;
+ while (1) {
+ node *ln;
+
+ if (!r->is_rightmost()) {
+ ln = r->get_next();
+ if (ln->get_prev() != r) {
+ fmt::print("next leaf {:d} points to {:d}, expect {:d}\n", ln->id(), ln->get_prev()->id(), r->id());
+ throw "list broken";
+ }
+ } else if (r->_rightmost_tree != &t) {
+ fmt::print("right leaf doesn't point to tree\n");
+ throw "list broken";
+ }
+
+ if (!r->is_leftmost()) {
+ ln = r->get_prev();
+ if (ln->get_next() != r) {
+ fmt::print("prev leaf {:d} points to {:d}, expect {:d}\n", ln->id(), ln->get_next()->id(), r->id());
+ throw "list broken";
+ }
+ } else if (r->_kids[0]._leftmost_tree != &t) {
+ fmt::print("left leaf doesn't point to tree\n");
+ throw "list broken";
+ }
+
+ if (r->_num_keys > 0 && t._less(r->_keys[0].v, K(prev))) {
+ fmt::print("list misorder on element {:d}, keys {}..., prev {:d}\n", r->id(), (int)r->_keys[0].v, prev);
+ throw "list broken";
+ }
+
+ if (!r->is_root() && r->_parent != nullptr) {
+ const auto p = r->_parent;
+ int i = p->index_for(r->_keys[0].v, t._less);
+ if (i > 0) {
+ if (p->_kids[i - 1].n != r->get_prev()) {
+ fmt::print("list misorder on parent check: node {:d}.{:d}, parent prev {:d}, list prev {:d}\n",
+ p->id(), r->id(), p->_kids[i - 1].n->id(), r->get_prev()->id());
+ throw "list broken";
+ }
+ }
+ if (i < p->_num_keys - 1) {
+ if (p->_kids[i + 1].n != r->get_next()) {
+ fmt::print("list misorder on parent check: node {:d}.{:d}, parent next {:d}, list next {:d}\n",
+ p->id(), r->id(), p->_kids[i + 1].n->id(), r->get_next()->id());
+ throw "list broken";
+ }
+ }
+ }
+
+ if (r->_num_keys > 0) {
+ prev = (int)r->_keys[r->_num_keys - 1].v;
+ }
+
+ if (r != t._left && r != t._right && (r->_flags & (node::NODE_LEFTMOST | node::NODE_RIGHTMOST))) {
+ fmt::print("middle {:d} is marked as left/right {}\n", r->id(), r->_flags);;
+ throw "list broken";
+ }
+
+ if (r->is_rightmost()) {
+ break;
+ }
+
+ r = r->get_next();
+ }
+}
+
+template <typename K, typename T, typename L, int NS>
+void validator<K, T, L, NS>::validate(const tree& t) {
+ try {
+ validate_list(t);
+ int min = 0, prev = 0;
+ if (t._root->_root_tree != &t) {
+ fmt::print("root doesn't point to tree\n");
+ throw "root broken";
+ }
+
+ validate_node(t, *t._root, prev, min, true);
+ } catch (...) {
+ print_tree(t, '|');
+ fmt::print("[ ");
+ node* lh = t._left;
+ while (1) {
+ fmt::print(" {:d}", lh->id());
+ if (lh->is_rightmost()) {
+ break;
+ }
+ lh = lh->get_next();
+ }
+ fmt::print("]\n");
+ throw;
+ }
+}
+
+template <typename K, typename T, typename Less, int NodeSize>
+class iterator_checker {
+ using tree = class tree<K, T, Less, NodeSize, key_search::both, with_debug::yes>;
+
+ validator<K, T, Less, NodeSize>& _tv;
+ tree& _t;
+ typename tree::iterator _fwd, _fend;
+ T _fprev;
+
+public:
+ iterator_checker(validator<K, T, Less, NodeSize>& tv, tree& t) : _tv(tv), _t(t),
+ _fwd(t.begin()), _fend(t.end()) {
+ }
+
+ bool step() {
+ try {
+ return forward_check();
+ } catch(...) {
+ _tv.print_tree(_t, ':');
+ throw;
+ }
+ }
+
+ bool here(const K& k) {
+ return _fwd != _fend && _fwd->match_key(k);
+ }
+
+private:
+ bool forward_check() {
+ if (_fwd == _fend) {
+ return false;
+ }
+ _fwd++;
+ if (_fwd == _fend) {
+ return false;
+ }
+ T val = *_fwd;
+ _fwd++;
+ if (_fwd == _fend) {
+ return false;
+ }
+ _fwd--;
+ if (val != *_fwd) {
+ fmt::print("Iterator broken, {:d} != {:d}\n", val, *_fwd);
+ throw "iterator";
+ }
+ if (val < _fprev) {
+ fmt::print("Iterator broken, {:d} < {:d}\n", val, _fprev);
+ throw "iterator";
+ }
+ _fprev = val;
+
+ return true;
+ }
+};
+
+} // namespace
+
diff --git a/utils/bptree.hh b/utils/bptree.hh
new file mode 100644
index 000000000..165b61ced
--- /dev/null
+++ b/utils/bptree.hh
@@ -0,0 +1,1862 @@
+/*
+ * Copyright (C) 2020 ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <
http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <boost/intrusive/parent_from_member.hpp>
+#include <seastar/util/defer.hh>
+#include <cassert>
+#include "utils/logalloc.hh"
+
+namespace bplus {
+
+enum class with_debug { no, yes };
+
+/*
+ * Linear search in a sorted array of keys slightly beats the
+ * binary one on small sizes. For debugging purposes both methods
+ * should be used (and the result must coinside).
+ */
+enum class key_search { linear, binary, both };
+
+/*
+ * The node_id class is purely a debugging thing -- when reading
+ * the validator print-s it's more handy to look at IDs consisting
+ * of 1-3 digits, rather than 16 hex-digits of a printed pointer
+ */
+template <with_debug D>
+struct node_id {
+ int operator()() const { return reinterpret_cast<uintptr_t>(this); }
+};
+
+template <>
+struct node_id<with_debug::yes> {
+ unsigned int _id;
+ static unsigned int _next() {
+ static std::atomic<unsigned int> rover {1};
+ return rover.fetch_add(1);
+ }
+
+ node_id() : _id(_next()) {}
+ int operator()() const { return _id; }
+};
+
+/*
+ * This wrapper prevents the value from being default-constructed
+ * when its container is created. The intended usage is to wrap
+ * elements of static arrays or containers with .emplace() methods
+ * that can live some time without the value in it.
+ *
+ * Similarly, the value is _not_ automatically destructed when this
+ * thing is, so ~Vaue() must be called by hands. For this there is the
+ * .remove() method and two helpers for common cases -- std::move-ing
+ * the value into another maybe-location (.emplace(maybe&&)) and
+ * constructing the new in place of the existing one (.replace(args...))
+ */
+template <typename Value>
+union maybe {
+ Value v;
+ maybe() noexcept {}
+ ~maybe() {}
+
+ void reset() { v.~Value(); }
+
+ /*
+ * Constructs the value inside the empty maybe wrapper.
+ */
+ template <typename... Args>
+ void emplace(Args&&... args) {
+ new (&v) Value (std::forward<Args>(args)...);
+ }
+
+ /*
+ * The special-case handling of moving some other alive maybe-value.
+ * Calls the source destructor after the move.
+ */
+ void emplace(maybe&& other) {
+ new (&v) Value(std::move(other.v));
+ other.reset();
+ }
+
+ /*
+ * Similar to emplace, but to be used on the alive maybe.
+ * Calls the destructor on it before constructing the new value.
+ */
+ template <typename... Args>
+ void replace(Args&&... args) {
+ reset();
+ emplace(std::forward<Args>(args)...);
+ }
+
+ void replace(maybe&& other) = delete; // not to be called by chance
+};
+
+// For .{do_something_with_data}_and_dispose methods below
+template <typename T>
+void default_dispose(T& value) { }
+
+/*
+ * Helper to explicitly capture all keys copying.
+ * Check test_key for more information.
+ */
+template <typename Key>
+GCC6_CONCEPT(requires std::is_nothrow_copy_constructible_v<Key>)
+Key copy_key(const Key& other) {
+ return Key(other);
+}
+
+/*
+ * Consider a small 2-level tree like this
+ *
+ * [ . 5 . ]
+ * | |
+ * +------+ +-----+
+ * | |
+ * [ 1 . 2 . 3 . ] [ 5 . 6 . 7 . ]
+ *
+ * And we remove key 5 from it. First -- the key is removed
+ * from the leaf entry
+ *
+ * [ . 5 . ]
+ * | |
+ * +------+ +-----+
+ * | |
+ * [ 1 . 2 . 3 . ] [ 6 . 7. ]
+ *
+ * At this point we have a choice -- whether or not to update
+ * the separation key on the parent (root). Strictly speaking,
+ * the whole tree is correct now -- all the keys on the right
+ * are greater-or-equal than their separation key, though the
+ * "equal" never happens.
+ *
+ * This can be problematic if the keys are stored on data nodes
+ * and are referenced from the (non-)leaf nodes. In this case
+ * the separation key must be updated to point to some real key
+ * in its sub-tree.
+ *
+ * [ . 6 . ] <--- this key updated
+ * | |
+ * +------+ +-----+
+ * | |
+ * [ 1 . 2 . 3 . ] [ 6 . 7. ]
+ *
+ * As this update takes some time, this behaviour is tunable.
+ *
+ */
+constexpr bool strict_separation_key = true;
+
+/*
+ * This is for testing, validator will be everybody's friend
+ * to have rights to check if the tree is internally correct.
+ */
+template <typename Key, typename T, typename Less, int NodeSize> class validator;
+template <with_debug Debug> class statistics;
+
+template <typename Key, typename T, typename Less, int NodeSize, key_search Search, with_debug Debug> class node;
+template <typename Key, typename T, typename Less, int NodeSize, key_search Search, with_debug Debug> class data;
+
+/*
+ * The tree itself.
+ * Equipped with O(1) (with little constant) begin() and end()
+ * and the iterator, that scans throug sorted keys and is not
+ * invalidated on insert/remove.
+ *
+ * The NodeSize parameter describes the amount of keys to be
+ * held on each node. Inner nodes will thus have N+1 sub-trees,
+ * leaf nodes will have N data pointers.
+ */
+
+GCC6_CONCEPT(
+ template <typename Key1, typename Key2, typename Less>
+ concept bool LessComparable = requires (const Key1& a, const Key2& b, Less less) {
+ { less(a, b) } -> bool;
+ { less(b, a) } -> bool;
+ };
+
+ template <typename T, typename Key>
+ concept bool CanGetKeyFromValue = requires (T val) {
+ { val.key() } -> Key;
+ };
+)
+
+struct stats {
+ unsigned long nodes;
+ std::vector<unsigned long> nodes_filled;
+ unsigned long leaves;
+ std::vector<unsigned long> leaves_filled;
+ unsigned long datas;
+};
+
+template <typename Key, typename T, typename Less, int NodeSize,
+ key_search Search = key_search::binary, with_debug Debug = with_debug::no>
+GCC6_CONCEPT( requires LessComparable<Key, Key, Less> &&
+ std::is_nothrow_move_constructible_v<Key> &&
+ std::is_nothrow_move_constructible_v<T>
+)
+class tree {
+public:
+ class iterator;
+ friend class validator<Key, T, Less, NodeSize>;
+ friend class node<Key, T, Less, NodeSize, Search, Debug>;
+
+ // Sanity not to allow slow key-search in non-debug mode
+ static_assert(Debug == with_debug::yes || Search != key_search::both);
+
+ using node = class node<Key, T, Less, NodeSize, Search, Debug>;
+ using data = class data<Key, T, Less, NodeSize, Search, Debug>;
+
+private:
+
+ node* _root = nullptr;
+ node* _left = nullptr;
+ node* _right = nullptr;
+ Less _less;
+
+ template <typename K>
+ node& find_leaf_for(const K& k) const {
+ node* cur = _root;
+
+ while (!cur->is_leaf()) {
+ int i = cur->index_for(k, _less);
+ cur = cur->_kids[i].n;
+ }
+
+ return *cur;
+ }
+
+ void maybe_init_empty_tree() {
+ if (_root != nullptr) {
+ return;
+ }
+
+ node* n = node::create();
+ n->_flags |= node::NODE_LEAF | node::NODE_ROOT | node::NODE_RIGHTMOST | node::NODE_LEFTMOST;
+ do_set_root(n);
+ do_set_left(n);
+ do_set_right(n);
+ }
+
+ node* left_leaf_slow() const {
+ node* cur = _root;
+ while (!cur->is_leaf()) {
+ cur = cur->_kids[0].n;
+ }
+ return cur;
+ }
+
+ node* right_leaf_slow() const {
+ node* cur = _root;
+ while (!cur->is_leaf()) {
+ cur = cur->_kids[cur->_num_keys].n;
+ }
+ return cur;
+ }
+
+ template <typename K>
+ iterator get_bound(const K& k, bool& upper) {
+ maybe_init_empty_tree();
+
+ node& n = find_leaf_for(k);
+ int i = n.index_for(k, _less);
+
+ /*
+ * Element at i (key at i - 1) is less or equal to the k,
+ * the next element is greater. Mind corner cases.
+ */
+
+ if (i == 0) {
+ assert(n.is_leftmost());
+ return begin();
+ } else if (i <= n._num_keys) {
+ iterator cur = iterator(n._kids[i].d, i);
+ if (upper || _less(n._keys[i - 1].v, k)) {
+ cur++;
+ } else {
+ // Here 'upper' becomes 'match'
+ upper = true;
+ }
+
+ return cur;
+ } else {
+ assert(n.is_rightmost());
+ return end();
+ }
+ }
+
+public:
+
+ tree(const tree& other) = delete;
+ const tree& operator=(const tree& other) = delete;
+ tree& operator=(tree&& other) = delete;
+
+ explicit tree(Less less) : _less(less) { }
+ ~tree() {
+ if (_root != nullptr) {
+ node::destroy(*_root);
+ }
+ }
+
+ Less less() const { return _less; }
+
+ tree(tree&& other) noexcept : _less(std::move(other._less)) {
+ if (other._root) {
+ do_set_root(other._root);
+ do_set_left(other._left);
+ do_set_right(other._right);
+
+ other._root = nullptr;
+ other._left = nullptr;
+ other._right = nullptr;
+ }
+ }
+
+ // XXX -- this uses linear scan over the leaf nodes
+ size_t size_slow() const {
+ if (_root == nullptr) {
+ return 0;
+ }
+
+ size_t ret = 0;
+ const node* leaf = _left;
+ while (1) {
+ assert(leaf->is_leaf());
+ ret += leaf->_num_keys;
+ if (leaf == _right) {
+ break;
+ }
+ leaf = leaf->get_next();
+ }
+
+ return ret;
+ }
+
+ // Returns result that is equal (both not less than each other)
+ template <typename K = Key>
+ GCC6_CONCEPT(requires LessComparable<K, Key, Less>)
+ iterator find(const K& k) {
+ maybe_init_empty_tree();
+
+ node& n = find_leaf_for(k);
+ int i = n.index_for(k, _less);
+
+ if (i >= 1 && !_less(n._keys[i - 1].v, k)) {
+ return iterator(n._kids[i].d, i);
+ } else {
+ return end();
+ }
+ }
+
+ // Returns the least x out of those !less(x, k)
+ template <typename K = Key>
+ GCC6_CONCEPT(requires LessComparable<K, Key, Less>)
+ iterator lower_bound(const K& k) {
+ bool upper = false;
+ return get_bound(k, upper);
+ }
+
+ template <typename K = Key>
+ GCC6_CONCEPT(requires LessComparable<K, Key, Less>)
+ iterator lower_bound(const K& k, bool& match) {
+ match = false;
+ return get_bound(k, match);
+ }
+
+ // Returns the least x out of those less(k, x)
+ template <typename K = Key>
+ GCC6_CONCEPT(requires LessComparable<K, Key, Less>)
+ iterator upper_bound(const K& k) {
+ bool upper = true;
+ return get_bound(k, upper);
+ }
+
+ /*
+ * Constructs the element with key k inside the tree and returns
+ * iterator on it. If the key already exists -- just returns the
+ * iterator on it and sets the .second to false.
+ */
+ template <typename... Args>
+ std::pair<iterator, bool> emplace(Key k, Args&&... args) {
+ maybe_init_empty_tree();
+
+ node& n = find_leaf_for(k);
+ int i = n.index_for(k, _less);
+
+ if (i >= 1 && !_less(n._keys[i - 1].v, k)) {
+ // Direct hit
+ return std::pair(iterator(n._kids[i].d, i), false);
+ }
+
+ data* d = data::create(std::forward<Args>(args)...);
+ auto x = seastar::defer([&d] { data::destroy(*d, default_dispose<T>); });
+ n.insert(i, std::move(k), d, _less);
+ assert(d->attached());
+ x.cancel();
+ return std::pair(iterator(d, i + 1), true);
+ }
+
+ template <typename Func>
+ GCC6_CONCEPT(requires requires (Func f, T val) { { f(val) } -> void; } )
+ iterator erase_and_dispose(const Key& k, Func&& disp) {
+ maybe_init_empty_tree();
+
+ node& n = find_leaf_for(k);
+
+ data* d;
+ int i = n.index_for(k, _less) - 1;
+
+ if (i < 0) {
+ return end();
+ }
+
+ assert(n._num_keys > 0);
+
+ if (_less(n._keys[i].v, k)) {
+ return end();
+ }
+
+ d = n._kids[i + 1].d;
+ iterator it(d, i + 1);
+ it++;
+
+ n.remove(i, _less);
+
+ data::destroy(*d, disp);
+ return it;
+ }
+
+ template <typename Func>
+ GCC6_CONCEPT(requires requires (Func f, T val) { { f(val) } -> void; } )
+ iterator erase_and_dispose(iterator from, iterator to, Func&& disp) {
+ /*
+ * FIXME this is dog slow k*logN algo, need k+logN one
+ */
+ while (from != to) {
+ from = from.erase_and_dispose(disp, _less);
+ }
+
+ return to;
+ }
+
+ template <typename... Args>
+ iterator erase(Args&&... args) { return erase_and_dispose(std::forward<Args>(args)..., default_dispose<T>); }
+
+ template <typename Func>
+ GCC6_CONCEPT(requires requires (Func f, T val) { { f(val) } -> void; } )
+ void clear_and_dispose(Func&& disp) {
+ if (_root != nullptr) {
+ _root->clear(
+ [this, &disp] (data* d) { data::destroy(*d, disp); },
+ [this] (node* n) { node::destroy(*n); }
+ );
+
+ node::destroy(*_root);
+ _root = nullptr;
+ _left = nullptr;
+ _right = nullptr;
+ }
+ }
+
+ void clear() { clear_and_dispose(default_dispose<T>); }
+
+private:
+ void do_set_left(node *n) {
+ assert(n->is_leftmost());
+ _left = n;
+ n->_kids[0]._leftmost_tree = this;
+ }
+
+ void do_set_right(node *n) {
+ assert(n->is_rightmost());
+ _right = n;
+ n->_rightmost_tree = this;
+ }
+
+ void do_set_root(node *n) {
+ assert(n->is_root());
+ n->_root_tree = this;
+ _root = n;
+ }
+
+public:
+ /*
+ * Iterator. Scans the datas in the sorted-by-key order.
+ * Is not invalidated by emplace/erase-s of other elements.
+ * Move constructors may turn the _idx invalid, but the
+ * .revalidate() method makes it good again.
+ */
+ class iterator {
+ friend class tree;
+
+ /*
+ * When the iterator gets to the end the _data is
+ * replaced with the _tree obtained from the right
+ * leaf, and the _idx is set to npos
+ */
+ union {
+ tree* _tree;
+ data* _data;
+ };
+ int _idx;
+
+ /*
+ * It could be 0 as well, as leaf nodes cannot have
+ * kids (data nodes) at 0 position, but ...
+ */
+ static constexpr int npos = -1;
+
+ bool is_end() const { return _idx == npos; }
+
+ explicit iterator(tree* t) : _tree(t), _idx(npos) { }
+ iterator(data* d, int idx) : _data(d), _idx(idx) { }
+
+ /*
+ * The routine makes sure the iterator's index is valid
+ * and returns back the leaf that points to it.
+ */
+ node* revalidate() {
+ assert(!is_end());
+
+ node* leaf = _data->_leaf;
+
+ /*
+ * The data._leaf pointer is always valid (it's updated
+ * on insert/remove operations), the datas do not move
+ * as well, so if the leaf still points at us, it is valid.
+ */
+ if (_idx > leaf->_num_keys || leaf->_kids[_idx].d != _data) {
+ _idx = leaf->index_for(_data);
+ }
+
+ return leaf;
+ }
+
+ public:
+ using iterator_category = std::bidirectional_iterator_tag;
+ using value_type = T;
+ using difference_type = ssize_t;
+ using pointer = value_type*;
+ using reference = value_type&;
+
+ /*
+ * Special constructor for the case when there's the need for an
+ * iterator to the given value poiter. In this case we need to
+ * get three things:
+ * - pointer on class data: we assume that the value pointer
+ * is indeed embedded into the data and do the "container_of"
+ * maneuver
+ * - index at which the data is seen on the leaf: use the
+ * standard revalidation. Note, that we start with index 1
+ * which gives us 1/NodeSize chance of hitting the right index
+ * right at once :)
+ * - the tree itself: the worst thing here, creating an iterator
+ * like this is logN operation
+ */
+ iterator(T* value) : _idx(1) {
+ _data = boost::intrusive::get_parent_from_member(value, &data::value);
+ revalidate();
+ }
+
+ iterator() : iterator(static_cast<tree*>(nullptr)) {}
+
+ reference operator*() const { return _data->value; }
+ pointer operator->() const { return &_data->value; }
+
+ iterator& operator++() {
+ node* leaf = revalidate();
+ if (_idx < leaf->_num_keys) {
+ _idx++;
+ } else {
+ if (leaf->is_rightmost()) {
+ _idx = npos;
+ _tree = leaf->_rightmost_tree;
+ return *this;
+ }
+
+ leaf = leaf->get_next();
+ _idx = 1;
+ }
+ _data = leaf->_kids[_idx].d;
+ return *this;
+ }
+
+ iterator& operator--() {
+ if (is_end()) {
+ node* n = _tree->_right;
+ assert(n->_num_keys > 0);
+ _data = n->_kids[n->_num_keys].d;
+ _idx = n->_num_keys;
+ return *this;
+ }
+
+ node* leaf = revalidate();
+ if (_idx > 1) {
+ _idx--;
+ } else {
+ leaf = leaf->get_prev();
+ _idx = leaf->_num_keys;
+ }
+ _data = leaf->_kids[_idx].d;
+ return *this;
+ }
+
+ iterator operator++(int) {
+ iterator cur = *this;
+ operator++();
+ return cur;
+ }
+
+ iterator operator--(int) {
+ iterator cur = *this;
+ operator--();
+ return cur;
+ }
+
+ bool operator==(const iterator& o) const { return is_end() ? o.is_end() : _data == o._data; }
+ bool operator!=(const iterator& o) const { return !(*this == o); }
+
+ /*
+ * The key _MUST_ be in order and not exist,
+ * neither of those is checked
+ */
+ template <typename KeyFn, typename... Args>
+ iterator emplace_before(KeyFn key, Less less, Args&&... args) {
+ node* leaf;
+ int i;
+
+ if (!is_end()) {
+ leaf = revalidate();
+ i = _idx - 1;
+
+ if (i == 0 && !leaf->is_leftmost()) {
+ /*
+ * If we're about to insert a key before the 0th one, then
+ * we must make sure the separation keys from upper layers
+ * will separate the new key as well. If they won't then we
+ * should select the left sibling for insertion.
+ *
+ * For !strict_separation_key the solution is simple -- the
+ * upper level separation keys match the current 0th one, so
+ * we always switch to the left sibling.
+ *
+ * If we're already on the left-most leaf -- just insert, as
+ * there's no separatio key above it.
+ */
+ if (!strict_separation_key) {
+ assert(false && "Not implemented");
+ }
+ leaf = leaf->get_prev();
+ i = leaf->_num_keys;
+ }
+ } else {
+ _tree->maybe_init_empty_tree();
+ leaf = _tree->_right;
+ i = leaf->_num_keys;
+ }
+
+ assert(i >= 0);
+
+ data* d = data::create(std::forward<Args>(args)...);
+ auto x = seastar::defer([&d] { data::destroy(*d, default_dispose<T>); });
+ leaf->insert(i, std::move(key(d)), d, less);
+ assert(d->attached());
+ x.cancel();
+ /*
+ * XXX -- if the node was not split we can ++ it index
+ * and keep iterator valid :)
+ */
+ return iterator(d, i);
+ }
+
+ template <typename... Args>
+ iterator emplace_before(Key k, Less less, Args&&... args) {
+ return emplace_before([&k] (data*) -> Key { return std::move(k); },
+ less, std::forward<Args>(args)...);
+ }
+
+ template <typename... Args>
+ GCC6_CONCEPT(requires CanGetKeyFromValue<T, Key>)
+ iterator emplace_before(Less less, Args&&... args) {
+ return emplace_before([] (data* d) -> Key { return d->value.key(); },
+ less, std::forward<Args>(args)...);
+ }
+
+ private:
+ /*
+ * Prepare a likely valid iterator for the next element.
+ * Likely means, that unless removal starts rebalancing
+ * datas the _idx will be for the correct pointer.
+ *
+ * This is just like the operator++, with the exception
+ * that staying on the leaf doesn't increase the _idx, as
+ * in this case the next element will be shifted left to
+ * the current position.
+ */
+ iterator next_after_erase(node* leaf) {
+ if (_idx < leaf->_num_keys) {
+ return iterator(leaf->_kids[_idx + 1].d, _idx);
+ }
+
+ if (leaf->is_rightmost()) {
+ return iterator(leaf->_rightmost_tree);
+ }
+
+ leaf = leaf->get_next();
+ return iterator(leaf->_kids[1].d, 1);
+ }
+
+ public:
+ template <typename Func>
+ iterator erase_and_dispose(Func&& disp, Less less) {
+ node* leaf = revalidate();
+ iterator cur = next_after_erase(leaf);
+
+ leaf->remove(_idx - 1, less);
+ data::destroy(*_data, disp);
+
+ return cur;
+ }
+
+ iterator erase(Less less) { return erase_and_dispose(default_dispose<T>, less); }
+
+ template <typename... Args>
+ void reconstruct(size_t new_size, Args&&... args) {
+ node* leaf = revalidate();
+ auto ptr = current_allocator().alloc(&get_standard_migrator<data>(), new_size, alignof(data));
+ data *dat, *cur = _data;
+
+ try {
+ dat = new (ptr) data(std::forward<Args>(args)...);
+ } catch(...) {
+ current_allocator().free(ptr, new_size);
+ throw;
+ }
+
+ dat->_leaf = leaf;
+ cur->_leaf = nullptr;
+
+ _data = dat;
+ leaf->_kids[_idx].d = dat;
+
+ current_allocator().destroy(cur);
+ }
+
+ size_t storage_size() const { return _data->storage_size(); }
+ };
+
+ iterator begin() {
+ if (_root == nullptr || _root->_num_keys == 0) {
+ return end();
+ }
+
+ assert(_left->_num_keys > 0);
+ // Leaf nodes have data pointers starting from index 1
+ return iterator(_left->_kids[1].d, 1);
+ }
+ iterator end() { return iterator(this); }
+
+ using reverse_iterator = std::reverse_iterator<iterator>;
+ reverse_iterator rbegin() { return std::make_reverse_iterator(end()); }
+ reverse_iterator rend() { return std::make_reverse_iterator(begin()); }
+
+ struct stats get_stats() {
+ struct stats st;
+
+ st.nodes = 0;
+ st.leaves = 0;
+ st.datas = 0;
+
+ if (_root != nullptr) {
+ st.nodes_filled.resize(NodeSize + 1);
+ st.leaves_filled.resize(NodeSize + 1);
+ _root->fill_stats(st);
+ }
+
+ return st;
+ }
+};
+
+
+/*
+ * A node describes both, inner and leaf nodes.
+ */
+template <typename Key, typename T, typename Less, int NodeSize, key_search Search, with_debug Debug>
+class node final {
+ friend class validator<Key, T, Less, NodeSize>;
+ friend class tree<Key, T, Less, NodeSize, Search, Debug>;
+
+ using tree = class tree<Key, T, Less, NodeSize, Search, Debug>;
+ using data = class data<Key, T, Less, NodeSize, Search, Debug>;
+
+ class prealloc;
+
+ /*
+ * The NodeHalf is the level at which the node is considered
+ * to be underflown and should be re-filled. This slightly
+ * differs for even and odd sizes.
+ *
+ * For odd sizes the node will stand until it contains literally
+ * more than 1/2 of it's size (e.g. for size 5 keeping 3 keys
+ * is OK). For even cases this barrier is less than the actual
+ * half (e.g. for size 4 keeping 2 is still OK).
+ */
+ static constexpr int NodeHalf = ((NodeSize - 1) / 2);
+ static_assert(NodeHalf >= 1);
+
+ union node_or_data_or_tree {
+ node* n;
+ data* d;
+
+ tree* _leftmost_tree; // See comment near node::__next about this
+ };
+
+ using node_or_data = node_or_data_or_tree;
+
+ friend data::data(data&&);
+
+ [[no_unique_address]] node_id<Debug> id;
+
+ unsigned short _num_keys;
+ unsigned short _flags;
+
+ static const unsigned short NODE_ROOT = 0x1;
+ static const unsigned short NODE_LEAF = 0x2;
+ static const unsigned short NODE_LEFTMOST = 0x4;
+ static const unsigned short NODE_RIGHTMOST = 0x8;
+
+ bool is_leaf() const { return _flags & NODE_LEAF; }
+ bool is_root() const { return _flags & NODE_ROOT; }
+ bool is_rightmost() const { return _flags & NODE_RIGHTMOST; }
+ bool is_leftmost() const { return _flags & NODE_LEFTMOST; }
+
+ /*
+ * separation keys
+ * non-leaf nodes:
+ * the kids[i] contains keys[i + 1] <= k < keys[i + 2]
+ * kids[0] contains keys < all keys in the node
+ * leaf nodes:
+ * kids[i + 1] is the data for keys[i]
+ * kids[0] is unused
+ *
+ * In the examples below the leaf nodes will be shown like
+ *
+ * keys: [012]
+ * datas: [-012]
+ *
+ * and the non-leaf ones like
+ *
+ * keys: [012]
+ * kids: [A012]
+ *
+ * to have digits correspond to different elements and staying
+ * in its correct positions. And the A kid is this left-most one
+ * at index 0 for the non-leaf node.
+ */
+
+ /*
+ * Wrap keys in array with "maybe" not to default-construct yet
+ * unused key in it on node allocation.
+ */
+
+ maybe<Key> _keys[NodeSize];
+ node_or_data _kids[NodeSize + 1];
+
+ /*
+ * The root node uses this to point to the tree object. This is
+ * needed to update tree->_root on node move.
+ */
+ union {
+ node* _parent;
+ tree* _root_tree;
+ };
+
+ /*
+ * Leaf nodes are linked in a list, since leaf nodes do
+ * not use the _kids[0] pointer we re-use it. Respectively,
+ * non-leaf nodes don't use the __next one.
+ *
+ * Also, leftmost and rightmost respectively have prev and
+ * next pointing to the tree object itsef. This is done for
+ * _left/_right update on node move.
+ */
+ union {
+ node* __next;
+ tree* _rightmost_tree;
+ };
+
+ node* get_next() const {
+ assert(is_leaf());
+ return __next;
+ }
+
+ void set_next(node *n) {
+ assert(is_leaf());
+ __next = n;
+ }
+
+ node* get_prev() const {
+ assert(is_leaf());
+ return _kids[0].n;
+ }
+
+ void set_prev(node* n) {
+ assert(is_leaf());
+ _kids[0].n = n;
+ }
+
+ // Links the new node n right after the current one
+ void link(node& n) {
+ if (is_rightmost()) {
+ _flags &= ~NODE_RIGHTMOST;
+ n._flags |= node::NODE_RIGHTMOST;
+ tree* t = _rightmost_tree;
+ assert(t->_right == this);
+ t->do_set_right(&n);
+ } else {
+ n.set_next(get_next());
+ get_next()->set_prev(&n);
+ }
+
+ n.set_prev(this);
+ set_next(&n);
+ }
+
+ void unlink() {
+ node* x;
+ tree* t;
+
+ switch (_flags & (node::NODE_LEFTMOST | node::NODE_RIGHTMOST)) {
+ case node::NODE_LEFTMOST:
+ x = get_next();
+ _flags &= ~node::NODE_LEFTMOST;
+ x->_flags |= node::NODE_LEFTMOST;
+ t = _kids[0]._leftmost_tree;
+ assert(t->_left == this);
+ t->do_set_left(x);
+ break;
+ case node::NODE_RIGHTMOST:
+ x = get_prev();
+ _flags &= ~node::NODE_RIGHTMOST;
+ x->_flags |= node::NODE_RIGHTMOST;
+ t = _rightmost_tree;
+ assert(t->_right == this);
+ t->do_set_right(x);
+ break;
+ case 0:
+ get_prev()->set_next(get_next());
+ get_next()->set_prev(get_prev());
+ break;
+ default:
+ /*
+ * Right- and left-most at the same time can only be root,
+ * otherwise this would mean we have root with 0 keys.
+ */
+ assert(false);
+ }
+
+ set_next(this);
+ set_prev(this);
+ }
+
+ node(const node& other) = delete;
+ const node& operator=(const node& other) = delete;
+ node& operator=(node&& other) = delete;
+
+ /*
+ * There's no pointer/reference from nodes to the tree, neither
+ * there is such from data, because otherwise we'd have to update
+ * all of them inside tree move constructor, which in turn would
+ * make it toooo slow linear operation. Thus we walk up the nodes
+ * ._parent chain up to the root node which has the _root_tree.
+ */
+ tree* tree_slow() const {
+ const node* cur = this;
+
+ while (!cur->is_root()) {
+ cur = cur->_parent;
+ }
+
+ return cur->_root_tree;
+ }
+
+ /*
+ * Finds the index of the subtree to which the k belongs.
+ * Respectively, the key[i - 1] <= k < key[i] (and if i == 0
+ * then the node is inner and the key is in leftmost subtree).
+ */
+ template <typename K>
+ int index_for(const K& k, Less less) const {
+ return index_for(k, less, std::integral_constant<key_search, Search>());
+ }
+
+ template <typename K>
+ int index_for(const K& k, Less less, std::integral_constant<key_search, key_search::both>) const {
+ int rl = index_for(k, less, std::integral_constant<key_search, key_search::linear>());
+ int rb = index_for(k, less, std::integral_constant<key_search, key_search::binary>());
+ assert(rl == rb);
+ return rl;
+ }
+
+ template <typename K>
+ int index_for(const K& k, Less less, std::integral_constant<key_search, key_search::binary>) const {
+ int s = 0, e = _num_keys - 1, c = 0;
+
+ while (s <= e) {
+ int i = (s + e) / 2;
+ c++;
+ if (less(k, _keys[i].v)) {
+ e = i - 1;
+ } else {
+ s = i + 1;
+ }
+ }
+
+ return s;
+ }
+
+ template <typename K>
+ int index_for(const K& k, Less less, std::integral_constant<key_search, key_search::linear>) const {
+ int i;
+
+ for (i = 0; i < _num_keys; i++) {
+ if (less(k, _keys[i].v)) {
+ break;
+ }
+ }
+
+ return i;
+ }
+
+ int index_for(node *n) const {
+ // Keep index on kid (FIXME?)
+
+ int i;
+
+ for (i = 0; i <= _num_keys; i++) {
+ if (_kids[i].n == n) {
+ break;
+ }
+ }
+ assert(i <= _num_keys);
+ return i;
+ }
+
+ bool need_refill() const {
+ return _num_keys <= NodeHalf;
+ }
+
+ bool can_grab_from() const {
+ return _num_keys > NodeHalf + 1;
+ }
+
+ bool can_push_to() const {
+ return _num_keys < NodeSize;
+ }
+
+ bool can_merge_with(const node& n) const {
+ return _num_keys + n._num_keys + (is_leaf() ? 0 : 1) <= NodeSize;
+ }
+
+ void shift_right(int s) {
+ for (int i = _num_keys - 1; i >= s; i--) {
+ _keys[i + 1].emplace(std::move(_keys[i]));
+ _kids[i + 2] = _kids[i + 1];
+ }
+ _num_keys++;
+ }
+
+ void shift_left(int s) {
+ // The key at s is expected to be .remove()-d !
+ for (int i = s; i < _num_keys - 1; i++) {
+ _keys[i].emplace(std::move(_keys[i + 1]));
+ _kids[i + 1] = _kids[i + 2];
+ }
+ _num_keys--;
+ }
+
+ void move_keys_and_kids(int foff, node& to, int toff, int count) {
+ for (int i = 0; i < count; i++) {
+ to._keys[toff + i].emplace(std::move(_keys[foff + i]));
+ to._kids[toff + i + 1] = _kids[foff + i + 1];
+ }
+ }
+
+ void move_to(node& to, int off, int count) {
+ move_keys_and_kids(off, to, 0, count);
+ _num_keys = off;
+ to._num_keys = count;
+ if (is_leaf()) {
+ for (int i = 0; i < count; i++) {
+ to._kids[i + 1].d->reattach(&to);
+ }
+ } else {
+ for (int i = 0; i < count; i++) {
+ to._kids[i + 1].n->_parent = &to;
+ }
+ }
+
+ }
+
+ void grab_from_left(node& from, maybe<Key>& sep) {
+ /*
+ * Grab one element from the left sibling and return
+ * the new separation key for them.
+ *
+ * Leaf: just move the last key (and the last kid) and report
+ * it as new separation key
+ *
+ * keys: [012] -> [56] = [01] [256] 2 is new separation
+ * datas: [-012] -> [-56] = [-01] [-256]
+ *
+ * Non-leaf is trickier. We need the current separation key
+ * as we're grabbing the last element which has no the right
+ * boundary on the node. So the parent node tells us one.
+ *
+ * keys: [012] -> s [56] = [01] 2 [s56] 2 is new separation
+ * kids: [A012] -> [B56] = [A01] [2B56]
+ */
+
+ int i = from._num_keys - 1;
+
+ shift_right(0);
+ from._num_keys--;
+
+ if (is_leaf()) {
+ _keys[0].emplace(std::move(from._keys[i]));
+ _kids[1] = from._kids[i + 1];
+ _kids[1].d->reattach(this);
+ sep.replace(std::move(copy_key(_keys[0].v)));
+ } else {
+ _keys[0].emplace(std::move(sep));
+ _kids[1] = _kids[0];
+ _kids[0] = from._kids[i + 1];
+ _kids[0].n->_parent = this;
+ sep.emplace(std::move(from._keys[i]));
+ }
+ }
+
+ void merge_into(node& t, Key key) {
+ /*
+ * Merge current node into t preparing it for being
+ * killed. This merge is slightly different for leaves
+ * and for non-leaves wrt the 0th element.
+ *
+ * Non-leaves. For those we need the separation key, whic
+ * is passed to us. The caller "knows" that this and t are
+ * two siblings and thus the separation key is the one from
+ * the parent node. For this reason merging two non-leaf
+ * nodes needs one more slot in the target as compared to
+ * the leaf-nodes case.
+ *
+ * keys: [012] + K + [456] = [012K456]
+ * kids: [A012] + [B456] = [A012B456]
+ *
+ * Leaves. This is simple -- just go ahead and merge.
+ *
+ * keys: [012] + [456] = [012456]
+ * datas: [-012] + [-456] = [-012456]
+ */
+
+ if (!t.is_leaf()) {
+ int i = t._num_keys;
+ t._keys[i].emplace(std::move(key));
+ t._kids[i + 1] = _kids[0];
+ t._kids[i + 1].n->_parent = &t;
+ t._num_keys++;
+ }
+
+ move_keys_and_kids(0, t, t._num_keys, _num_keys);
+
+ if (t.is_leaf()) {
+ for (int i = t._num_keys; i < t._num_keys + _num_keys; i++) {
+ t._kids[i + 1].d->reattach(&t);
+ }
+ } else {
+ for (int i = t._num_keys; i < t._num_keys + _num_keys; i++) {
+ t._kids[i + 1].n->_parent = &t;
+ }
+ }
+
+ t._num_keys += _num_keys;
+ _num_keys = 0;
+ }
+
+ void grab_from_right(node& from, maybe<Key>& sep) {
+ /*
+ * Grab one element from the right sibling and return
+ * the new separation key for them.
+ *
+ * Leaf: just move the 0th key (and 1st kid) and the
+ * new separation key is what becomes 0 in the source.
+ *
+ * keys: [01] <- [456] = [014] [56] 5 is new separation
+ * datas: [-01] <- [-456] = [-014] [-56]
+ *
+ * Non-leaf is trickier. We need the current separation
+ * key as we're grabbing the kids[0] element which has no
+ * corresponding keys[-1]. So the parent node tells us one.
+ *
+ * keys: [01] <- s [456] = [01s] 4 [56] 4 is new separation
+ * kids: [A01] <- [B456] = [A01B] [456]
+ */
+
+ int i = _num_keys;
+
+ if (is_leaf()) {
+ _keys[i].emplace(std::move(from._keys[0]));
+ _kids[i + 1] = from._kids[1];
+ _kids[i + 1].d->reattach(this);
+ sep.replace(std::move(copy_key(from._keys[1].v)));
+ } else {
+ _kids[i + 1] = from._kids[0];
+ _kids[i + 1].n->_parent = this;
+ _keys[i].emplace(std::move(sep));
+ from._kids[0] = from._kids[1];
+ sep.emplace(std::move(from._keys[0]));
+ }
+
+ _num_keys++;
+ from.shift_left(0);
+ }
+
+ /*
+ * When splitting, the result should be almost equal. The
+ * "almost" depends on the node-size being odd or even and
+ * on the node itself being leaf or inner.
+ */
+ bool equally_split(const node& n2) const {
+ if (Debug == with_debug::yes) {
+ return (_num_keys == n2._num_keys) ||
+ (_num_keys == n2._num_keys + 1) ||
+ (_num_keys + 1 == n2._num_keys);
+ }
+ return true;
+ }
+
+ // Helper for assert(). See comment for do_insert for details.
+ bool left_kid_sorted(const Key& k, Less less) const {
+ if (Debug == with_debug::yes && !is_leaf() && _num_keys > 0) {
+ node* x = _kids[0].n;
+ if (x != nullptr && less(k, x->_keys[x->_num_keys - 1].v)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ template <typename DFunc, typename NFunc>
+ GCC6_CONCEPT(requires
+ requires (DFunc f, data* val) { { f(val) } -> void; } &&
+ requires (NFunc f, node* n) { { f(n) } -> void; }
+ )
+ void clear(DFunc&& ddisp, NFunc&& ndisp) {
+ if (is_leaf()) {
+ _flags &= ~(node::NODE_LEFTMOST | node::NODE_RIGHTMOST);
+ set_next(this);
+ set_prev(this);
+ } else {
+ node* n = _kids[0].n;
+ n->clear(ddisp, ndisp);
+ ndisp(n);
+ }
+
+ for (int i = 0; i < _num_keys; i++) {
+ _keys[i].reset();
+ if (is_leaf()) {
+ ddisp(_kids[i + 1].d);
+ } else {
+ node* n = _kids[i + 1].n;
+ n->clear(ddisp, ndisp);
+ ndisp(n);
+ }
+ }
+
+ _num_keys = 0;
+ }
+
+ static node* create() {
+ return current_allocator().construct<node>();
+ }
+
+ static void destroy(node& n) {
+ current_allocator().destroy(&n);
+ }
+
+ void drop() {
+ assert(!is_root());
+ if (is_leaf()) {
+ unlink();
+ }
+ destroy(*this);
+ }
+
+ void insert_into_full(int idx, Key k, node_or_data nd, Less less, prealloc& nodes) {
+ if (!is_root()) {
+ node& p = *_parent;
+ int i = p.index_for(_keys[0].v, less);
+
+ /*
+ * Try to push left or right existing keys to the respective
+ * siblings. Keep in mind two corner cases:
+ *
+ * 1. Push to left. In this case the new key should not go
+ * to the [0] element, otherwise we'd have to update the p's
+ * separation key one more time.
+ *
+ * 2. Push to right. In this case we must make sure the new
+ * key is not the rightmost itself, otherwise it's _him_ who
+ * must be pushed there.
+ *
+ * Both corner cases are possible to implement though.
+ */
+ if (idx > 1 && i > 0) {
+ node* left = p._kids[i - 1].n;
+ if (left->can_push_to()) {
+ /*
+ * We've moved the 0th elemet from this, so the index
+ * for the new key shifts too
+ */
+ idx--;
+ left->grab_from_right(*this, p._keys[i - 1]);
+ }
+ }
+
+ if (idx < _num_keys && i < p._num_keys) {
+ node* right = p._kids[i + 1].n;
+ if (right->can_push_to()) {
+ right->grab_from_left(*this, p._keys[i]);
+ }
+ }
+
+ if (_num_keys < NodeSize) {
+ do_insert(idx, std::move(k), nd, less);
+ nodes.drain();
+ return;
+ }
+
+ /*
+ * We can only get here if both ->can_push_to() checks above
+ * had failed. In this case -- go ahead and split this.
+ */
+ }
+
+ split_and_insert(idx, std::move(k), nd, less, nodes);
+ }
+
+ void split_and_insert(int idx, Key k, node_or_data nd, Less less, prealloc& nodes) {
+ assert(_num_keys >= NodeSize);
+
+ node* nn = nodes.pop();
+ maybe<Key> sep;
+
+ /*
+ * Insertion with split.
+ * 1. Existing node (this) is split into two. We try a bit harder
+ * than we might to to make the split equal.
+ * 2. The new element is added to either of the resulting nodes.
+ * 3. The new node nn is inserted into parent one with the help
+ * of a separation key sep
+ *
+ * First -- find the position in the current node at which the
+ * new element should have appeared.
+ */
+
+ int off = NodeHalf + (idx > NodeHalf ? 1 : 0);
+
+ if (is_leaf()) {
+ nn->_flags |= NODE_LEAF;
+ link(*nn);
+
+ /*
+ * Split of leaves. This is simple -- just copy the needed
+ * amount of keys and kids from this to nn, then insert the
+ * new pair into the proper place. When inserting the new
+ * node into parent the separation key is the one latter
+ * starts with.
+ *
+ * keys: [01234]
+ * datas: [-01234]
+ *
+ * if the new key is below 2, then
+ * keys: -> [01] [234] -> [0n1] [234] -> sep is 2
+ * datas: -> [-01] [-234] -> [-0n1] [-234]
+ *
+ * if the new key is above 2, then
+ * keys: -> [012] [34] -> [012] [3n4] -> sep is 3 (or n)
+ * datas: -> [-012] [-34] -> [-012] [-3n4]
+ */
+ move_to(*nn, off, NodeSize - off);
+
+ if (idx <= NodeHalf) {
+ do_insert(idx, std::move(k), nd, less);
+ } else {
+ nn->do_insert(idx - off, std::move(k), nd, less);
+ }
+ sep.emplace(std::move(copy_key(nn->_keys[0].v)));
+ } else {
+ /*
+ * Node insertion has one special case -- when the new key
+ * gets directly into the middle.
+ */
+ if (idx == NodeHalf + 1) {
+ /*
+ * Split of nodes and the new key is in the middle. In this
+ * we need to split the node into two, but take the k as the
+ * separation kep. The corresponding data becomes new node's
+ * 0 kid.
+ *
+ * keys: [012345] -> [012] k [345] (and the k goes up)
+ * kids: [A012345] -> [A012] [n345]
+ */
+ move_to(*nn, off, NodeSize - off);
+ sep.emplace(std::move(k));
+ nn->_kids[0] = nd;
+ nn->_kids[0].n->_parent = nn;
+ } else {
+ /*
+ * Split of nodes and the new key gets into either of the
+ * halves. This is like leaves split, but we need to carefully
+ * handle the kids[0] for both. The correspoding key is not
+ * on the node and "has" an index of -1 and thus becomes the
+ * separation one for the upper layer.
+ *
+ * keys: [012345]
+ * datas: [A012345]
+ *
+ * if the new key goes left then
+ * keys: -> [01] 2 [345] -> [0n1] 2 [345]
+ * datas: -> [A01] [2345] -> [A0n1] [2345]
+ *
+ * if the new key goes right then
+ * keys: -> [012] 3 [45] -> [012] 3 [4n5]
+ * datas: -> [A012] [345] -> [-123] [34n5]
+ */
+ move_to(*nn, off + 1, NodeSize - off - 1);
+ sep.emplace(std::move(_keys[off]));
+ nn->_kids[0] = _kids[off + 1];
+ nn->_kids[0].n->_parent = nn;
+ _num_keys--;
+
+ if (idx <= NodeHalf) {
+ do_insert(idx, std::move(k), nd, less);
+ } else {
+ nd.n->_parent = nn;
+ nn->do_insert(idx - off - 1, std::move(k), nd, less);
+ }
+ }
+ }
+
+ assert(equally_split(*nn));
+
+ if (is_root()) {
+ insert_into_root(*nn, std::move(sep.v), nodes);
+ } else {
+ insert_into_parent(*nn, std::move(sep.v), less, nodes);
+ }
+ sep.reset();
+ }
+
+ void do_insert(int i, Key k, node_or_data nd, Less less) {
+ assert(_num_keys < NodeSize);
+
+ /*
+ * The new k:nd pair should be put into the given index and
+ * shift offenders to the right. However, if it should be
+ * put left to the non-leaf's left-most node -- it's a BUG,
+ * as there's no corresponding key here.
+ *
+ * Non-leaf nodes get here when their kids are split, and
+ * when they do, if the kid gets into the left-most sub-tree,
+ * it's directly put there, and this helper is not called.
+ * Said that, if we're inserting a new pair, the newbie can
+ * only get to the right of the left-most kid.
+ */
+ assert(i != 0 || left_kid_sorted(k, less));
+
+ shift_right(i);
+
+ // Puts k:nd pair at position idx (keys[idx] and kids[idx + 1])
+ _keys[i].emplace(std::move(k));
+ _kids[i + 1] = nd;
+ if (is_leaf()) {
+ nd.d->attach(*this);
+ }
+ }
+
+ void insert_into_parent(node& nn, Key sep, Less less, prealloc& nodes) {
+ nn._parent = _parent;
+ _parent->insert_key(std::move(sep), node_or_data{n: &nn}, less, nodes);
+ }
+
+ void insert_into_root(node& nn, Key sep, prealloc& nodes) {
+ tree* t = _root_tree;
+
+ node* nr = nodes.pop();
+
+ nr->_num_keys = 1;
+ nr->_keys[0].emplace(std::move(sep));
+ nr->_kids[0].n = this;
+ nr->_kids[1].n = &nn;
+ _flags &= ~node::NODE_ROOT;
+ _parent = nr;
+ nn._parent = nr;
+
+ nr->_flags |= node::NODE_ROOT;
+ t->do_set_root(nr);
+ }
+
+ void insert_key(Key k, node_or_data nd, Less less, prealloc& nodes) {
+ int i = index_for(k, less);
+ insert(i, std::move(k), nd, less, nodes);
+ }
+
+ void insert(int i, Key k, node_or_data nd, Less less, prealloc& nodes) {
+ if (_num_keys == NodeSize) {
+ insert_into_full(i, std::move(k), nd, less, nodes);
+ } else {
+ do_insert(i, std::move(k), nd, less);
+ }
+ }
+
+ void insert(int i, Key k, data* d, Less less) {
+ prealloc nodes;
+
+ /*
+ * Prepare the nodes for split in advaice, if the node::create will
+ * start throwing while splitting we'll have troubles "unsplitting"
+ * the nodes back.
+ */
+ node* cur = this;
+ while (cur->_num_keys == NodeSize) {
+ nodes.push();
+ if (cur->is_root()) {
+ nodes.push();
+ break;
+ }
+ cur = cur->_parent;
+ }
+
+ insert(i, std::move(k), node_or_data{d: d}, less, nodes);
+ assert(nodes.empty());
+ }
+
+ void remove_from(int i, Less less) {
+ _keys[i].reset();
+ shift_left(i);
+
+ if (!is_root()) {
+ if (need_refill()) {
+ refill(less);
+ }
+ } else if (_num_keys == 0 && !is_leaf()) {
+ node* nr;
+ nr = _kids[0].n;
+ nr->_flags |= node::NODE_ROOT;
+ _root_tree->do_set_root(nr);
+
+ _flags &= ~node::NODE_ROOT;
+ _parent = nullptr;
+ drop();
+ }
+ }
+
+ void merge_kids(node& t, node& n, int idx, Less less) {
+ n.merge_into(t, std::move(_keys[idx].v));
+ n.drop();
+ remove_from(idx, less);
+ }
+
+ void refill(Less less) {
+ node& p = *_parent, *left, *right;
+
+ /*
+ * We need to locate this node's index at parent array by using
+ * the 0th key, so make sure it exists. We can go even without
+ * it, but since we don't let's be on the safe side.
+ */
+ assert(_num_keys > 0);
+ int i = p.index_for(_keys[0].v, less);
+ assert(p._kids[i].n == this);
+
+ /*
+ * The node is "underflown" (see comment near NodeHalf
+ * about what this means), so we try to refill it at the
+ * siblings' expense. Many cases possible, but we go with
+ * only four:
+ *
+ * 1. Left sibling exists and it has at least 1 item
+ * above being the half-full. -> we grab one element
+ * from it.
+ *
+ * 2. Left sibling exists and we can merge current with
+ * it. "Can" means the resulting node will not overflow
+ * which, in turn, differs by one for leaf and non-leaf
+ * nodes. For leaves the merge is possible is the total
+ * number of the elements fits the maximum. For non-leaf
+ * we'll need room for one more element, here's why:
+ *
+ * [012] + [456] -> [012X456]
+ * [A012] + [B456] -> [A012B456]
+ *
+ * The key X in the middle separates B from everything on
+ * the left and this key was not sitting on either of the
+ * wannabe merging nodes. This X is the current separation
+ * of these two nodes taken from their parent.
+ *
+ * And two same cases for the right sibling.
+ */
+
+ left = i > 0 ? p._kids[i - 1].n : nullptr;
+ right = i < p._num_keys ? p._kids[i + 1].n : nullptr;
+
+ if (left != nullptr && left->can_grab_from()) {
+ grab_from_left(*left, p._keys[i - 1]);
+ return;
+ }
+
+ if (right != nullptr && right->can_grab_from()) {
+ grab_from_right(*right, p._keys[i]);
+ return;
+ }
+
+ if (left != nullptr && can_merge_with(*left)) {
+ p.merge_kids(*left, *this, i - 1, less);
+ return;
+ }
+
+ if (right != nullptr && can_merge_with(*right)) {
+ p.merge_kids(*this, *right, i, less);
+ return;
+ }
+
+ /*
+ * Susprisingly, the node in the B+ tree can violate the
+ * "minimally filled" rule for non roots. It _can_ stay with
+ * less than half elements on board. The next remove from
+ * it or either of its siblings will probably refill it.
+ *
+ * Keeping 1 key on the non-root node is possible, but needs
+ * some special care -- if we will remove this last key from
+ * this node, the code will try to refill one and will not
+ * be able to find this node's index at parent (the call for
+ * index_for() above).
+ */
+ assert(_num_keys > 1);
+ }
+
+ void remove(int i, Less less) {
+ assert(i >= 0);
+ /*
+ * Update the matching separation key from above. It
+ * exists only if we're removing the 0th key, but for
+ * the left-most child it doesn't exist.
+ *
+ * Note, that the latter check is crucial for clear()
+ * performance, as it's always removes the left-most
+ * key, without this check each remove() would walk the
+ * tree upwards in vain.
+ */
+ if (strict_separation_key && i == 0 && !is_leftmost()) {
+ const Key& k = _keys[i].v;
+ node* p = this;
+
+ while (!p->is_root()) {
+ p = p->_parent;
+ int j = p->index_for(k, less) - 1;
+ if (j >= 0) {
+ p->_keys[j].replace(std::move(copy_key(_keys[1].v)));
+ break;
+ }
+ }
+ }
+
+ remove_from(i, less);
+ }
+
+public:
+ explicit node() : _num_keys(0) , _flags(0) , _parent(nullptr) { }
+
+ ~node() {
+ assert(_num_keys == 0);
+ assert(is_root() || !is_leaf() || (get_prev() == this && get_next() == this));
+ }
+
+ node(node&& other) noexcept : _flags(other._flags) {
+ if (is_leaf()) {
+ if (!is_rightmost()) {
+ set_next(other.get_next());
+ get_next()->set_prev(this);
+ } else {
+ other._rightmost_tree->do_set_right(this);
+ }
+
+ if (!is_leftmost()) {
+ set_prev(other.get_prev());
+ get_prev()->set_next(this);
+ } else {
+ other._kids[0]._leftmost_tree->do_set_left(this);
+ }
+
+ other._flags &= ~(NODE_LEFTMOST | NODE_RIGHTMOST);
+ other.set_next(&other);
+ other.set_prev(&other);
+ } else {
+ _kids[0].n = other._kids[0].n;
+ _kids[0].n->_parent = this;
+ }
+
+ other.move_to(*this, 0, other._num_keys);
+
+ if (!is_root()) {
+ _parent = other._parent;
+ int i = _parent->index_for(&other);
+ assert(_parent->_kids[i].n == &other);
+ _parent->_kids[i].n = this;
+ } else {
+ other._root_tree->do_set_root(this);
+ }
+ }
+
+ int index_for(data *d) const {
+ /*
+ * We'd could look up the data's new idex with binary search,
+ * but we don't have the key at hands
+ */
+
+ int i;
+
+ for (i = 1; i <= _num_keys; i++) {
+ if (_kids[i].d == d) {
+ break;
+ }
+ }
+ assert(i <= _num_keys);
+ return i;
+ }
+
+private:
+ class prealloc {
+ std::vector<node*> _nodes;
+ public:
+ bool empty() { return _nodes.empty(); }
+
+ void push() {
+ _nodes.push_back(node::create());
+ }
+
+ node* pop() {
+ assert(!_nodes.empty());
+ node* ret = _nodes.back();
+ _nodes.pop_back();
+ return ret;
+ }
+
+ void drain() {
+ while (!empty()) {
+ node::destroy(*pop());
+ }
+ }
+
+ ~prealloc() {
+ drain();
+ }
+ };
+
+ void fill_stats(struct stats& st) {
+ if (is_leaf()) {
+ st.leaves_filled[_num_keys]++;
+ st.leaves++;
+ st.datas += _num_keys;
+ } else {
+ st.nodes_filled[_num_keys]++;
+ st.nodes++;
+ for (int i = 0; i <= _num_keys; i++) {
+ _kids[i].n->fill_stats(st);
+ }
+ }
+ }
+};
+
+/*
+ * The data represents data node (the actual data is stored "outside"
+ * of the tree). The tree::emplace() constructs the payload inside the
+ * data before inserting it into the tree.
+ */
+template <typename K, typename T, typename Less, int NS, key_search S, with_debug D>
+class data final {
+ friend class validator<K, T, Less, NS>;
+ template <typename c1, typename c2, typename c3, int s1, key_search p1, with_debug p2>
+ friend class tree<c1, c2, c3, s1, p1, p2>::iterator;
+
+ using node = class node<K, T, Less, NS, S, D>;
+
+ node* _leaf;
+ T value;
+
+public:
+ template <typename... Args>
+ static data* create(Args&&... args) {
+ return current_allocator().construct<data>(std::forward<Args>(args)...);
+ }
+
+ template <typename Func>
+ GCC6_CONCEPT(requires requires (Func f, T val) { { f(val) } -> void; } )
+ static void destroy(data& d, Func&& disp) {
+ disp(d.value);
+ d._leaf = nullptr;
+ current_allocator().destroy(&d);
+ }
+
+ template <typename... Args>
+ data(Args&& ... args) : _leaf(nullptr), value(std::forward<Args>(args)...) {}
+
+ data(data&& other) noexcept : _leaf(other._leaf), value(std::move(other.value)) {
+ if (attached()) {
+ int i = _leaf->index_for(&other);
+ _leaf->_kids[i].d = this;
+ other._leaf = nullptr;
+ }
+ }
+
+ ~data() { assert(!attached()); }
+
+ bool attached() const { return _leaf != nullptr; }
+
+ void attach(node& to) {
+ assert(!attached());
+ _leaf = &to;
+ }
+
+ void reattach(node* to) {
+ assert(attached());
+ _leaf = to;
+ }
+
+ size_t storage_size() const {
+ return sizeof(data) - sizeof(T) + size_for_allocation_strategy(value);
+ }
+
+ friend size_t size_for_allocation_strategy(const data& obj) {
+ return obj.storage_size();
+ }
+};
+
+} // namespace bplus
diff --git a/test/boost/bptree_test.cc b/test/boost/bptree_test.cc
new file mode 100644
index 000000000..ea8c6ce71
--- /dev/null
+++ b/test/boost/bptree_test.cc
@@ -0,0 +1,344 @@
+
+/*
+ * Copyright (C) 2020 ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <
http://www.gnu.org/licenses/>.
+ */
+
+#define BOOST_TEST_MODULE bptree
+
+#include <boost/test/unit_test.hpp>
+#include <fmt/core.h>
+
+#include "utils/bptree.hh"
+#include "test/unit/bptree_key.hh"
+
+struct int_compare {
+ bool operator()(const int& a, const int& b) const { return a < b; }
+};
+
+using namespace bplus;
+using test_tree = tree<int, unsigned long, int_compare, 4, key_search::both, with_debug::yes>;
+
+BOOST_AUTO_TEST_CASE(test_ops_empty_tree) {
+ /* Sanity checks for no nullptr dereferences */
+ test_tree t(int_compare{});
+ t.erase(1);
+ t.find(1);
+}
+
+BOOST_AUTO_TEST_CASE(test_double_insert) {
+ /* No assertions should happen in ~tree */
+ test_tree t(int_compare{});
+ auto i = t.emplace(1, 1);
+ BOOST_REQUIRE(i.second);
+ i = t.emplace(1, 1);
+ BOOST_REQUIRE(!i.second);
+ t.erase(1);
+}
+
+BOOST_AUTO_TEST_CASE(test_cookie_find) {
+ struct int_to_key_compare {
+ bool operator()(const test_key& a, const int& b) const { return (int)a < b; }
+ bool operator()(const int& a, const test_key& b) const { return a < (int)b; }
+ bool operator()(const test_key& a, const test_key& b) const {
+ test_key_compare cmp;
+ return cmp(a, b);
+ }
+ };
+
+ using test_tree = tree<test_key, int, int_to_key_compare, 4, key_search::both, with_debug::yes>;
+
+ test_tree t(int_to_key_compare{});
+ t.emplace(test_key{1}, 132);
+
+ auto i = t.find(1);
+ BOOST_REQUIRE(*i == 132);
+ t.clear();
+}
+
+BOOST_AUTO_TEST_CASE(test_double_erase) {
+ test_tree t(int_compare{});
+ t.emplace(1, 1);
+ t.emplace(2, 2);
+ auto i = t.erase(1);
+ BOOST_REQUIRE(*i == 2);
+ i = t.erase(1);
+ BOOST_REQUIRE(i == t.end());
+ i = t.erase(2);
+ BOOST_REQUIRE(i == t.end());
+ t.erase(2);
+}
+
+BOOST_AUTO_TEST_CASE(test_remove_corner_case) {
+ /* Sanity check for erasure to be precise */
+ test_tree t(int_compare{});
+ t.emplace(1, 1);
+ t.emplace(2, 123);
+ t.emplace(3, 3);
+ t.erase(1);
+ t.erase(3);
+ auto f = t.find(2);
+ BOOST_REQUIRE(*f == 123);
+ t.erase(2);
+}
+
+BOOST_AUTO_TEST_CASE(test_end_iterator) {
+ /* Check std::prev(end()) */
+ test_tree t(int_compare{});
+ t.emplace(1, 123);
+ auto i = std::prev(t.end());
+ BOOST_REQUIRE(*i = 123);
+ t.erase(1);
+}
+
+BOOST_AUTO_TEST_CASE(test_next_to_end_iterator) {
+ /* Same, but with "artificial" end iterator */
+ test_tree t(int_compare{});
+ auto i = t.emplace(1, 123).first;
+ i++;
+ BOOST_REQUIRE(i == t.end());
+ i--;
+ BOOST_REQUIRE(*i = 123);
+ t.erase(1);
+}
+
+BOOST_AUTO_TEST_CASE(test_clear) {
+ /* Quick check for tree::clear */
+ test_tree t(int_compare{});
+
+ for (int i = 0; i < 32; i++) {
+ t.emplace(i, i);
+ }
+
+ t.clear();
+}
+
+BOOST_AUTO_TEST_CASE(test_post_clear) {
+ /* Check that tree is work-able after clear */
+ test_tree t(int_compare{});
+
+ t.emplace(1, 1);
+ t.clear();
+ t.emplace(2, 2);
+ t.erase(2);
+}
+
+BOOST_AUTO_TEST_CASE(test_iterator_erase) {
+ /* Check iterator::erase */
+ test_tree t(int_compare{});
+ auto it = t.emplace(2, 2);
+ t.emplace(1, 321);
+ it.first.erase(int_compare{});
+ BOOST_REQUIRE(*t.find(1) == 321);
+ t.erase(1);
+}
+
+BOOST_AUTO_TEST_CASE(test_iterator_equal) {
+ test_tree t(int_compare{});
+ auto i1 = t.emplace(1, 1);
+ auto i2 = t.emplace(2, 2);
+ auto i3 = t.find(1);
+ BOOST_REQUIRE(i1.first == i3);
+ BOOST_REQUIRE(i1.first != i2.first);
+ t.clear();
+}
+
+BOOST_AUTO_TEST_CASE(test_lower_bound) {
+ test_tree t(int_compare{});
+ t.emplace(1, 11);
+ t.emplace(3, 13);
+
+ bool match;
+ BOOST_REQUIRE(*t.lower_bound(0, match) == 11 && !match);
+ BOOST_REQUIRE(*t.lower_bound(1, match) == 11 && match);
+ BOOST_REQUIRE(*t.lower_bound(2, match) == 13 && !match);
+ BOOST_REQUIRE(*t.lower_bound(3, match) == 13 && match);
+ BOOST_REQUIRE(t.lower_bound(4, match) == t.end() && !match);
+ t.clear();
+}
+
+BOOST_AUTO_TEST_CASE(test_upper_bound) {
+ test_tree t(int_compare{});
+ t.emplace(1, 11);
+ t.emplace(3, 13);
+
+ BOOST_REQUIRE(*t.upper_bound(0) == 11);
+ BOOST_REQUIRE(*t.upper_bound(1) == 13);
+ BOOST_REQUIRE(*t.upper_bound(2) == 13);
+ BOOST_REQUIRE(t.upper_bound(3) == t.end());
+ BOOST_REQUIRE(t.upper_bound(4) == t.end());
+ t.clear();
+}
+
+BOOST_AUTO_TEST_CASE(test_insert_iterator_index) {
+ /* Check insertion iterator ++ and duplicate key */
+ test_tree t(int_compare{});
+ t.emplace(1, 10);
+ t.emplace(3, 13);
+ auto i = t.emplace(2, 2).first;
+ i++;
+ BOOST_REQUIRE(*i == 13);
+ auto i2 = t.emplace(2, 2); /* 2nd insert finds the previous */
+ BOOST_REQUIRE(!i2.second);
+ i2.first++;
+ BOOST_REQUIRE(*(i2.first) == 13);
+ t.clear();
+}
+
+BOOST_AUTO_TEST_CASE(test_insert_before) {
+ /* Check iterator::insert_before */
+ test_tree t(int_compare{});
+ auto i3 = t.emplace(3, 13).first;
+ auto i2 = i3.emplace_before(2, int_compare{}, 12);
+ BOOST_REQUIRE(++i2 == i3);
+ BOOST_REQUIRE(*i3 == 13);
+ BOOST_REQUIRE(*--i2 == 12);
+ BOOST_REQUIRE(*--i3 == 12);
+ t.clear();
+}
+
+BOOST_AUTO_TEST_CASE(test_insert_before_end) {
+ /* The same but for end() iterator */
+ test_tree t(int_compare{});
+ auto i = t.emplace(1, 1).first;
+ auto i2 = t.end().emplace_before(2, int_compare{}, 12);
+ BOOST_REQUIRE(++i == i2);
+ BOOST_REQUIRE(++i2 == t.end());
+ t.clear();
+}
+
+BOOST_AUTO_TEST_CASE(test_insert_before_end_empty) {
+ /* The same, but for empty tree */
+ test_tree t(int_compare{});
+ auto i = t.end().emplace_before(42, int_compare{}, 142);
+ BOOST_REQUIRE(i == t.begin());
+ t.erase(42);
+}
+
+BOOST_AUTO_TEST_CASE(test_iterators) {
+ test_tree t(int_compare{});
+
+ for (auto i = t.rbegin(); i != t.rend(); i++) {
+ BOOST_REQUIRE(false);
+ }
+ for (auto i = t.begin(); i != t.end(); i++) {
+ BOOST_REQUIRE(false);
+ }
+
+ t.emplace(1, 7);
+ t.emplace(2, 9);
+
+ {
+ auto i = t.begin();
+ BOOST_REQUIRE(*(i++) == 7);
+ BOOST_REQUIRE(*(i++) == 9);
+ BOOST_REQUIRE(i == t.end());
+ }
+
+ {
+ auto i = t.rbegin();
+ BOOST_REQUIRE(*(i++) == 9);
+ BOOST_REQUIRE(*(i++) == 7);
+ BOOST_REQUIRE(i == t.rend());
+ }
+
+ t.clear();
+}
+
+/*
+ * Special test that makes sure "self-iterator" works OK.
+ * See comment near the bptree::iterator(T* d) constructor
+ * for details.
+ */
+class tree_data {
+ int _key;
+ int _cookie;
+public:
+ explicit tree_data(int cookie) : _key(-1), _cookie(cookie) {}
+ tree_data(int key, int cookie) : _key(key), _cookie(cookie) {}
+ int cookie() const { return _cookie; }
+ int key() const {
+ assert(_key != -1);
+ return _key;
+ }
+};
+
+BOOST_AUTO_TEST_CASE(test_data_self_iterator) {
+ using test_tree = tree<int, tree_data, int_compare, 4, key_search::both, with_debug::yes>;
+
+ test_tree t(int_compare{});
+ auto i = t.emplace(1, 42);
+ BOOST_REQUIRE(i.second);
+
+ tree_data* d = &(*i.first);
+ BOOST_REQUIRE(d->cookie() == 42);
+
+ test_tree::iterator di(d);
+ BOOST_REQUIRE(di->cookie() == 42);
+
+ di.erase(int_compare{});
+ BOOST_REQUIRE(t.find(1) == t.end());
+}
+
+BOOST_AUTO_TEST_CASE(test_insert_before_nokey) {
+ using test_tree = tree<int, tree_data, int_compare, 4, key_search::both, with_debug::yes>;
+
+ test_tree t(int_compare{});
+ auto i = t.emplace(2, 52).first;
+ auto ni = i.emplace_before(int_compare{}, 1, 42);
+ BOOST_REQUIRE(ni->cookie() == 42);
+ ni++;
+ BOOST_REQUIRE(ni == i);
+ t.clear();
+}
+
+
+BOOST_AUTO_TEST_CASE(test_self_iterator_rover) {
+ test_tree t(int_compare{});
+ auto i = t.emplace(2, 42).first;
+ unsigned long* d = &(*i);
+ test_tree::iterator di(d);
+
+ i = di.emplace_before(1, int_compare{}, 31);
+ BOOST_REQUIRE(*i == 31);
+ BOOST_REQUIRE(*(++i) == 42);
+ BOOST_REQUIRE(++i == t.end());
+ BOOST_REQUIRE(++di == t.end());
+ t.clear();
+}
+
+BOOST_AUTO_TEST_CASE(test_erase_range) {
+ /* Quick check for tree::clear */
+ test_tree t(int_compare{});
+
+ for (int i = 0; i < 32; i++) {
+ t.emplace(i, i);
+ }
+
+ auto b = t.find(8);
+ auto e = t.find(25);
+ t.erase(b, e);
+
+ BOOST_REQUIRE(*t.find(7) == 7);
+ BOOST_REQUIRE(t.find(8) == t.end());
+ BOOST_REQUIRE(t.find(24) == t.end());
+ BOOST_REQUIRE(*t.find(25) == 25);
+
+ t.clear();
+}
diff --git a/test/perf/perf_bptree.cc b/test/perf/perf_bptree.cc
new file mode 100644
index 000000000..2de5e5139
--- /dev/null
+++ b/test/perf/perf_bptree.cc
@@ -0,0 +1,165 @@
+/*
+ * Copyright (C) 2020 ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <
http://www.gnu.org/licenses/>.
+ */
+
+#include <seastar/core/app-template.hh>
+#include <seastar/core/thread.hh>
+#include <algorithm>
+#include <vector>
+#include <random>
+#include <fmt/core.h>
+
+using key_t = int;
+
+struct key_compare {
+ bool operator()(const key_t& a, const key_t& b) const { return a < b; }
+};
+
+#include "utils/bptree.hh"
+
+using namespace bplus;
+using namespace seastar;
+
+constexpr int TEST_NODE_SIZE = 4;
+
+/* On node size 4 (this test) linear search works better */
+using test_tree = tree<key_t, unsigned long, key_compare, TEST_NODE_SIZE, key_search::linear>;
+
+class collection_tester {
+public:
+ virtual void insert(key_t k) = 0;
+ virtual void erase(key_t k) = 0;
+ virtual void show_stats() = 0;
+ virtual ~collection_tester() {};
+};
+
+class bptree_tester : public collection_tester {
+ test_tree _t;
+public:
+ bptree_tester() : _t(key_compare{}) {}
+ virtual void insert(key_t k) override { _t.emplace(k, 0); }
+ virtual void erase(key_t k) override { _t.erase(k); }
+ virtual void show_stats() {
+ struct bplus::stats st = _t.get_stats();
+ fmt::print("nodes: {}\n", st.nodes);
+ for (int i = 0; i < (int)st.nodes_filled.size(); i++) {
+ fmt::print(" {}: {} ({}%)\n", i, st.nodes_filled[i], st.nodes_filled[i] * 100 / st.nodes);
+ }
+ fmt::print("leaves: {}\n", st.leaves);
+ for (int i = 0; i < (int)st.leaves_filled.size(); i++) {
+ fmt::print(" {}: {} ({}%)\n", i, st.leaves_filled[i], st.leaves_filled[i] * 100 / st.leaves);
+ }
+ fmt::print("datas: {}\n", st.datas);
+ }
+ virtual ~bptree_tester() = default;
+};
+
+class set_tester : public collection_tester {
+ std::set<key_t> _s;
+public:
+ virtual void insert(key_t k) override { _s.insert(k); }
+ virtual void erase(key_t k) override { _s.erase(k); }
+ virtual void show_stats() { }
+ virtual ~set_tester() = default;
+};
+
+class map_tester : public collection_tester {
+ std::map<key_t, unsigned long> _m;
+public:
+ virtual void insert(key_t k) override { _m[k] = 0; }
+ virtual void erase(key_t k) override { _m.erase(k); }
+ virtual void show_stats() { }
+ virtual ~map_tester() = default;
+};
+
+int main(int argc, char **argv) {
+ namespace bpo = boost::program_options;
+ app_template app;
+ app.add_options()
+ ("count", bpo::value<int>()->default_value(5000000), "number of keys to fill the tree with")
+ ("batch", bpo::value<int>()->default_value(100), "number of operations between deferring points")
+ ("iters", bpo::value<int>()->default_value(1), "number of iterations")
+ ("col", bpo::value<std::string>()->default_value("bptree"), "collection to test")
+ ("stats", bpo::value<bool>()->default_value(false), "show stats");
+
+ return app.run(argc, argv, [&app] {
+ auto count = app.configuration()["count"].as<int>();
+ auto iters = app.configuration()["iters"].as<int>();
+ auto batch = app.configuration()["batch"].as<int>();
+ auto col = app.configuration()["col"].as<std::string>();
+ auto stats = app.configuration()["stats"].as<bool>();
+
+ return seastar::async([count, iters, batch, col, stats] {
+ int rep = iters;
+ collection_tester* c;
+
+ if (col == "bptree") {
+ c = new bptree_tester();
+ } else if (col == "set") {
+ c = new set_tester();
+ } else if (col == "map") {
+ c = new map_tester();
+ } else {
+ fmt::print("Unknown collection\n");
+ return;
+ }
+
+ std::vector<int> keys;
+
+ for (int i = 0; i < count; i++) {
+ keys.push_back(i + 1);
+ }
+
+ std::random_device rd;
+ std::mt19937 g(rd());
+
+ fmt::print("Inserting {:d} k:v pairs into {} {:d} times\n", count, col, iters);
+
+ again:
+ std::shuffle(keys.begin(), keys.end(), g);
+ seastar::thread::yield();
+ for (int i = 0; i < count; i++) {
+ c->insert(keys[i]);
+ if ((i + 1) % batch == 0) {
+ seastar::thread::yield();
+ }
+ }
+
+ if (stats) {
+ c->show_stats();
+ }
+
+ std::shuffle(keys.begin(), keys.end(), g);
+ seastar::thread::yield();
+ for (int i = 0; i < count; i++) {
+ c->erase(keys[i]);
+ if ((i + 1) % batch == 0) {
+ seastar::thread::yield();
+ }
+ }
+
+ if (--rep > 0) {
+ goto again;
+ }
+
+ delete c;
+ });
+ });
+}
diff --git a/test/perf/perf_bptree_drain.cc b/test/perf/perf_bptree_drain.cc
new file mode 100644
index 000000000..703e15719
--- /dev/null
+++ b/test/perf/perf_bptree_drain.cc
@@ -0,0 +1,154 @@
+/*
+ * Copyright (C) 2020 ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <
http://www.gnu.org/licenses/>.
+ */
+
+#include <chrono>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/thread.hh>
+#include <seastar/core/weak_ptr.hh>
+#include <algorithm>
+#include <vector>
+#include <random>
+#include <fmt/core.h>
+#include "perf.hh"
+
+using key_t = int;
+
+struct key_compare {
+ bool operator()(const key_t& a, const key_t& b) const { return a < b; }
+};
+
+#include "utils/bptree.hh"
+
+using namespace bplus;
+using namespace seastar;
+
+constexpr int TEST_NODE_SIZE = 4;
+
+/* On node size 4 (this test) linear search works better */
+using test_tree = tree<key_t, unsigned long, key_compare, TEST_NODE_SIZE, key_search::linear>;
+
+class collection_tester {
+public:
+ virtual void insert(key_t k) = 0;
+ virtual void drain(int batch) = 0;
+ virtual ~collection_tester() {};
+};
+
+class bptree_tester : public collection_tester {
+ test_tree _t;
+public:
+ bptree_tester() : _t(key_compare{}) {}
+ virtual void insert(key_t k) override { _t.emplace(k, 0); }
+ virtual void drain(int batch) override {
+ int x = 0;
+ auto i = _t.begin();
+ while (i != _t.end()) {
+ i = i.erase(key_compare{});
+ if (++x % batch == 0) {
+ seastar::thread::yield();
+ }
+ }
+ }
+ virtual ~bptree_tester() = default;
+};
+
+class set_tester : public collection_tester {
+ std::set<key_t> _s;
+public:
+ virtual void insert(key_t k) override { _s.insert(k); }
+ virtual void drain(int batch) override {
+ int x = 0;
+ auto i = _s.begin();
+ while (i != _s.end()) {
+ i = _s.erase(i);
+ if (++x % batch == 0) {
+ seastar::thread::yield();
+ }
+ }
+ }
+ virtual ~set_tester() = default;
+};
+
+int main(int argc, char **argv) {
+ namespace bpo = boost::program_options;
+ app_template app;
+ app.add_options()
+ ("count", bpo::value<int>()->default_value(5000000), "number of keys to fill the tree with")
+ ("batch", bpo::value<int>()->default_value(100), "number of operations between deferring points")
+ ("iters", bpo::value<int>()->default_value(1), "number of iterations")
+ ("col", bpo::value<std::string>()->default_value("bptree"), "collection to test");
+
+ return app.run(argc, argv, [&app] {
+ auto count = app.configuration()["count"].as<int>();
+ auto iters = app.configuration()["iters"].as<int>();
+ auto batch = app.configuration()["batch"].as<int>();
+ auto col = app.configuration()["col"].as<std::string>();
+
+ return seastar::async([count, iters, batch, col] {
+ int rep = iters;
+ collection_tester* c;
+
+ if (col == "bptree") {
+ c = new bptree_tester();
+ } else if (col == "set") {
+ c = new set_tester();
+ } else {
+ fmt::print("Unknown collection\n");
+ return;
+ }
+
+ std::vector<int> keys;
+
+ for (int i = 0; i < count; i++) {
+ keys.push_back(i + 1);
+ }
+
+ std::random_device rd;
+ std::mt19937 g(rd());
+
+ fmt::print("Inserting {:d} k:v pairs into {} {:d} times\n", count, col, iters);
+
+ again:
+ std::shuffle(keys.begin(), keys.end(), g);
+ seastar::thread::yield();
+ for (int i = 0; i < count; i++) {
+ c->insert(keys[i]);
+ if ((i + 1) % batch == 0) {
+ seastar::thread::yield();
+ }
+ }
+
+ scheduling_latency_measurer invalidate_slm;
+ invalidate_slm.start();
+ auto d = duration_in_seconds([&] {
+ c->drain(batch);
+ });
+ invalidate_slm.stop();
+ fmt::print("drain: {:.6f} [ms], preemption: {}\n", d.count() * 1000, invalidate_slm);
+
+ if (--rep > 0) {
+ goto again;
+ }
+
+ delete c;
+ });
+ });
+}
diff --git a/test/unit/bptree_compaction_test.cc b/test/unit/bptree_compaction_test.cc
new file mode 100644
index 000000000..9b1b48072
--- /dev/null
+++ b/test/unit/bptree_compaction_test.cc
@@ -0,0 +1,210 @@
+/*
+ * Copyright (C) 2020 ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <
http://www.gnu.org/licenses/>.
+ */
+
+#include <seastar/core/app-template.hh>
+#include <seastar/core/thread.hh>
+#include <map>
+#include <vector>
+#include <random>
+#include <string>
+#include <iostream>
+#include <fmt/core.h>
+#include "utils/logalloc.hh"
+
+constexpr int TEST_NODE_SIZE = 7;
+
+#include "bptree_key.hh"
+#include "utils/bptree.hh"
+#include "bptree_validation.hh"
+
+using namespace bplus;
+using namespace seastar;
+
+class test_data {
+ int _value;
+public:
+ test_data() : _value(0) {}
+ test_data(test_key& k) : _value((int)k + 10) {}
+
+ operator unsigned long() const { return _value; }
+ bool match_key(const test_key& k) const { return _value == (int)k + 10; }
+};
+using test_tree = tree<test_key, test_data, test_key_compare, TEST_NODE_SIZE, key_search::both, with_debug::yes>;
+using test_validator = validator<test_key, test_data, test_key_compare, TEST_NODE_SIZE>;
+
+class reference {
+ reference* _ref = nullptr;
+public:
+ reference() = default;
+ reference(const reference& other) = delete;
+
+ reference(reference&& other) noexcept : _ref(other._ref) {
+ if (_ref != nullptr) {
+ _ref->_ref = this;
+ }
+ other._ref = nullptr;
+ }
+
+ ~reference() {
+ if (_ref != nullptr) {
+ _ref->_ref = nullptr;
+ }
+ }
+
+ void link(reference& other) {
+ assert(_ref == nullptr);
+ _ref = &other;
+ other._ref = this;
+ }
+
+ reference* get() {
+ assert(_ref != nullptr);
+ return _ref;
+ }
+};
+
+class tree_pointer {
+ reference _ref;
+
+ class tree_wrapper {
+ friend class tree_pointer;
+ test_tree _tree;
+ reference _ref;
+ public:
+ tree_wrapper() : _tree(test_key_compare{}) {}
+ };
+
+ tree_wrapper* get_wrapper() {
+ return boost::intrusive::get_parent_from_member(_ref.get(), &tree_wrapper::_ref);
+ }
+
+public:
+
+ tree_pointer(const tree_pointer& other) = delete;
+ tree_pointer(tree_pointer&& other) = delete;
+
+ tree_pointer() {
+ tree_wrapper *t = current_allocator().construct<tree_wrapper>();
+ _ref.link(t->_ref);
+ }
+
+ test_tree* operator->() {
+ tree_wrapper *tw = get_wrapper();
+ return &tw->_tree;
+ }
+
+ test_tree& operator*() {
+ tree_wrapper *tw = get_wrapper();
+ return tw->_tree;
+ }
+
+ ~tree_pointer() {
+ tree_wrapper *tw = get_wrapper();
+ current_allocator().destroy(tw);
+ }
+};
+
+int main(int argc, char **argv) {
+ namespace bpo = boost::program_options;
+ app_template app;
+ app.add_options()
+ ("count", bpo::value<int>()->default_value(10000), "number of keys to fill the tree with")
+ ("iters", bpo::value<int>()->default_value(13), "number of iterations")
+ ("verb", bpo::value<bool>()->default_value(false), "be verbose");
+
+ return app.run(argc, argv, [&app] {
+ auto count = app.configuration()["count"].as<int>();
+ auto rep = app.configuration()["iters"].as<int>();
+ auto verb = app.configuration()["verb"].as<bool>();
+
+ return seastar::async([count, rep, verb] {
+ int iter = rep;
+ std::vector<int> keys;
+ for (int i = 0; i < count; i++) {
+ keys.push_back(i + 1);
+ }
+
+ std::random_device rd;
+ std::mt19937 g(rd());
+
+ fmt::print("Compacting {:d} k:v pairs {:d} times\n", count, iter);
+
+ test_validator tv;
+
+ logalloc::region mem;
+
+ with_allocator(mem.allocator(), [&] {
+ tree_pointer t;
+
+ again:
+ {
+ std::shuffle(keys.begin(), keys.end(), g);
+
+ logalloc::reclaim_lock rl(mem);
+
+ for (int i = 0; i < count; i++) {
+ test_key k(keys[i]);
+
+ auto ti = t->emplace(std::move(copy_key(k)), k);
+ assert(ti.second);
+ seastar::thread::maybe_yield();
+ }
+ }
+
+ mem.full_compaction();
+
+ if (verb) {
+ fmt::print("After fill + compact\n");
+ tv.print_tree(*t, '|');
+ }
+
+ tv.validate(*t);
+
+ {
+ std::shuffle(keys.begin(), keys.end(), g);
+
+ logalloc::reclaim_lock rl(mem);
+
+ for (int i = 0; i < count; i++) {
+ test_key k(keys[i]);
+
+ t->erase(k);
+ seastar::thread::maybe_yield();
+ }
+ }
+
+ mem.full_compaction();
+
+ if (verb) {
+ fmt::print("After erase + compact\n");
+ tv.print_tree(*t, '|');
+ }
+
+ tv.validate(*t);
+
+ if (--iter > 0) {
+ seastar::thread::yield();
+ goto again;
+ }
+ });
+ });
+ });
+}
diff --git a/test/unit/bptree_stress_test.cc b/test/unit/bptree_stress_test.cc
new file mode 100644
index 000000000..3060b1a7b
--- /dev/null
+++ b/test/unit/bptree_stress_test.cc
@@ -0,0 +1,236 @@
+/*
+ * Copyright (C) 2020 ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <
http://www.gnu.org/licenses/>.
+ */
+
+#include <seastar/core/app-template.hh>
+#include <seastar/core/thread.hh>
+#include <map>
+#include <vector>
+#include <random>
+#include <string>
+#include <iostream>
+#include <fmt/core.h>
+#include <fmt/ostream.h>
+
+constexpr int TEST_NODE_SIZE = 16;
+
+#include "bptree_key.hh"
+#include "utils/bptree.hh"
+#include "bptree_validation.hh"
+
+using namespace bplus;
+using namespace seastar;
+
+class test_data {
+ int _value;
+public:
+ test_data() : _value(0) {}
+ test_data(test_key& k) : _value((int)k + 10) {}
+
+ operator unsigned long() const { return _value; }
+ bool match_key(const test_key& k) const { return _value == (int)k + 10; }
+};
+
+std::ostream& operator<<(std::ostream& os, test_data d) {
+ os << (unsigned long)d;
+ return os;
+}
+
+using test_tree = tree<test_key, test_data, test_key_compare, TEST_NODE_SIZE, key_search::both, with_debug::yes>;
+using test_node = typename test_tree::node;
+using test_validator = validator<test_key, test_data, test_key_compare, TEST_NODE_SIZE>;
+using test_iterator_checker = iterator_checker<test_key, test_data, test_key_compare, TEST_NODE_SIZE>;
+
+int main(int argc, char **argv) {
+ namespace bpo = boost::program_options;
+ app_template app;
+ app.add_options()
+ ("count", bpo::value<int>()->default_value(4132), "number of keys to fill the tree with")
+ ("iters", bpo::value<int>()->default_value(9), "number of iterations")
+ ("keys", bpo::value<std::string>()->default_value("rand"), "how to generate keys (rand, asc, desc)")
+ ("verb", bpo::value<bool>()->default_value(false), "be verbose");
+
+ return app.run(argc, argv, [&app] {
+ auto count = app.configuration()["count"].as<int>();
+ auto iters = app.configuration()["iters"].as<int>();
+ auto ks = app.configuration()["keys"].as<std::string>();
+ auto verb = app.configuration()["verb"].as<bool>();
+
+ return seastar::async([count, iters, ks, verb] {
+ int rep = iters;
+ auto *t = new test_tree(test_key_compare{});
+ std::map<int, unsigned long> oracle;
+
+ int p = count / 10;
+ if (p == 0) {
+ p = 1;
+ }
+
+ std::vector<int> keys;
+
+ for (int i = 0; i < count; i++) {
+ keys.push_back(i + 1);
+ }
+
+ std::random_device rd;
+ std::mt19937 g(rd());
+
+ fmt::print("Inserting {:d} k:v pairs {:d} times\n", count, rep);
+
+ test_validator tv;
+
+ if (ks == "desc") {
+ fmt::print("Reversing keys vector\n");
+ std::reverse(keys.begin(), keys.end());
+ }
+
+ bool shuffle = ks == "rand";
+ if (shuffle) {
+ fmt::print("Will shuffle keys each iteration\n");
+ }
+
+
+ again:
+ auto* itc = new test_iterator_checker(tv, *t);
+
+ if (shuffle) {
+ std::shuffle(keys.begin(), keys.end(), g);
+ }
+
+ for (int i = 0; i < count; i++) {
+ test_key k(keys[i]);
+
+ if (verb) {
+ fmt::print("+++ {}\n", (int)k);
+ }
+
+ if (rep % 2 != 1) {
+ auto ir = t->emplace(std::move(copy_key(k)), k);
+ assert(ir.second);
+ } else {
+ auto ir = t->lower_bound(k);
+ ir.emplace_before(std::move(copy_key(k)), test_key_compare{}, k);
+ }
+ oracle[keys[i]] = keys[i] + 10;
+
+ if (verb) {
+ fmt::print("Validating\n");
+ tv.print_tree(*t, '|');
+ }
+
+ /* Limit validation rate for many keys */
+ if (i % (i/1000 + 1) == 0) {
+ tv.validate(*t);
+ }
+
+ if (i % 7 == 0) {
+ if (!itc->step()) {
+ delete itc;
+ itc = new test_iterator_checker(tv, *t);
+ }
+ }
+
+ seastar::thread::maybe_yield();
+ }
+
+ auto sz = t->size_slow();
+ if (sz != (size_t)count) {
+ fmt::print("Size {} != count {}\n", sz, count);
+ throw "size";
+ }
+
+ auto ti = t->begin();
+ for (auto oe : oracle) {
+ if (*ti != oe.second) {
+ fmt::print("Data mismatch {} vs {}\n", oe.second, *ti);
+ throw "oracle";
+ }
+ ti++;
+ }
+
+ if (shuffle) {
+ std::shuffle(keys.begin(), keys.end(), g);
+ }
+
+ for (int i = 0; i < count; i++) {
+ test_key k(keys[i]);
+
+ /*
+ * kill iterator if we're removing what it points to,
+ * otherwise it's not invalidated
+ */
+ if (itc->here(k)) {
+ delete itc;
+ itc = nullptr;
+ }
+
+ if (verb) {
+ fmt::print("--- {}\n", (int)k);
+ }
+
+ if (rep % 3 != 2) {
+ t->erase(k);
+ } else {
+ auto ri = t->find(k);
+ auto ni = ri;
+ ni++;
+ auto eni = ri.erase(test_key_compare{});
+ assert(ni == eni);
+ }
+
+ oracle.erase(keys[i]);
+
+ if (verb) {
+ fmt::print("Validating\n");
+ tv.print_tree(*t, '|');
+ }
+
+ if ((count-i) % ((count-i)/1000 + 1) == 0) {
+ tv.validate(*t);
+ }
+
+ if (itc == nullptr) {
+ itc = new test_iterator_checker(tv, *t);
+ }
+
+ if (i % 5 == 0) {
+ if (!itc->step()) {
+ delete itc;
+ itc = new test_iterator_checker(tv, *t);
+ }
+ }
+
+ seastar::thread::maybe_yield();
+ }
+
+ delete itc;
+
+ if (--rep > 0) {
+ if (verb) {
+ fmt::print("{:d} iterations left\n", rep);
+ }
+ goto again;
+ }
+
+ oracle.clear();
+ delete t;
+ });
+ });
+}
--
2.20.1